This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 438799b  HDDS-5953: EC: Review the current flush API and clean up 
(#2828)
438799b is described below

commit 438799ba7532895f2742be0948b8ff0ca686e0a5
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Thu Nov 11 23:51:31 2021 -0800

    HDDS-5953: EC: Review the current flush API and clean up (#2828)
    
    Co-authored-by: Uma Maheswara Rao G <[email protected]>
---
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  | 83 ++--------------------
 .../hadoop/ozone/client/TestOzoneECClient.java     | 16 +++++
 2 files changed, 23 insertions(+), 76 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index ed4b96e..24b79c7 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -20,12 +20,11 @@ package org.apache.hadoop.ozone.client.io;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -66,12 +65,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
   }
 
   private long currentBlockGroupLen = 0;
-  /**
-   * Defines stream action while calling handleFlushOrClose.
-   */
-  enum StreamAction {
-    FLUSH, CLOSE, FULL
-  }
 
   public static final Logger LOG =
       LoggerFactory.getLogger(KeyOutputStream.class);
@@ -413,22 +406,12 @@ public class ECKeyOutputStream extends KeyOutputStream {
   }
 
   @Override
-  public void flush() throws IOException {
-    checkNotClosed();
-    handleFlushOrClose(StreamAction.FLUSH);
+  public void flush() {
+    throw new NotImplementedException("The flush API is not implemented yet.");
   }
 
-  /**
-   * Close or Flush the latest outputStream depending upon the action.
-   * This function gets called when while write is going on, the current stream
-   * gets full or explicit flush or close request is made by client.
-   *
-   * @param op Flag which decides whether to call close or flush on the
-   *           outputStream.
-   * @throws IOException In case, flush or close fails with exception.
-   */
-  @SuppressWarnings("squid:S1141")
-  private void handleFlushOrClose(StreamAction op) throws IOException {
+  private void closeCurrentStreamEntry()
+      throws IOException {
     if (!blockOutputStreamEntryPool.isEmpty()) {
       while (true) {
         try {
@@ -436,7 +419,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
               blockOutputStreamEntryPool.getCurrentStreamEntry();
           if (entry != null) {
             try {
-              handleStreamAction(entry, op);
+              entry.close();
             } catch (IOException ioe) {
               handleException(entry, ioe);
               continue;
@@ -451,58 +434,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
     }
   }
 
-  private void handleFlushOrCloseAllStreams(StreamAction op)
-      throws IOException {
-    if (!blockOutputStreamEntryPool.isEmpty()) {
-      List<BlockOutputStreamEntry> allStreamEntries =
-          blockOutputStreamEntryPool.getStreamEntries();
-      for (int i = 0; i < allStreamEntries.size(); i++) {
-        while (true) {
-          try {
-            BlockOutputStreamEntry entry = allStreamEntries.get(i);
-            if (entry != null) {
-              try {
-                handleStreamAction(entry, op);
-              } catch (IOException ioe) {
-                handleException(entry, ioe);
-                continue;
-              }
-            }
-            return;
-          } catch (Exception e) {
-            markStreamClosed();
-            throw e;
-          }
-        }
-      }
-    }
-  }
-
-  private void handleStreamAction(BlockOutputStreamEntry entry, StreamAction 
op)
-      throws IOException {
-    Collection<DatanodeDetails> failedServers = entry.getFailedServers();
-    // failed servers can be null in case there is no data written in
-    // the stream
-    if (!failedServers.isEmpty()) {
-      blockOutputStreamEntryPool.getExcludeList().addDatanodes(failedServers);
-    }
-    switch (op) {
-    case CLOSE:
-      entry.close();
-      break;
-    case FULL:
-      if (entry.getRemaining() == 0) {
-        entry.close();
-      }
-      break;
-    case FLUSH:
-      entry.flush();
-      break;
-    default:
-      throw new IOException("Invalid Operation");
-    }
-  }
-
   /**
    * Commit the key to OM, this will add the blocks as the new key blocks.
    *
@@ -546,7 +477,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
         }
       }
 
-      handleFlushOrCloseAllStreams(StreamAction.CLOSE);
+      closeCurrentStreamEntry();
       Preconditions.checkArgument(writeOffset == offset);
       blockOutputStreamEntryPool.getCurrentStreamEntry().close();
       blockOutputStreamEntryPool.commitKey(offset);
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 9b2bb3c..8cd4365 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.client;
 
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -488,6 +489,21 @@ public class TestOzoneECClient {
     }
   }
 
+  @Test(expected = NotImplementedException.class)
+  public void testFlushShouldThrowNotImplementedException() throws IOException 
{
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    try (OzoneOutputStream out = bucket.createKey(keyName, 1024 * 3,
+        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+            chunkSize), new HashMap<>())) {
+      out.write(inputChunks[0]); // Just write some content.
+      out.flush();
+    }
+  }
+
 
   private OzoneBucket writeIntoECKey(byte[] data, String key,
       DefaultReplicationConfig defaultReplicationConfig) throws IOException {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to