This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 438799b HDDS-5953: EC: Review the current flush API and clean up
(#2828)
438799b is described below
commit 438799ba7532895f2742be0948b8ff0ca686e0a5
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Thu Nov 11 23:51:31 2021 -0800
HDDS-5953: EC: Review the current flush API and clean up (#2828)
Co-authored-by: Uma Maheswara Rao G <[email protected]>
---
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 83 ++--------------------
.../hadoop/ozone/client/TestOzoneECClient.java | 16 +++++
2 files changed, 23 insertions(+), 76 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index ed4b96e..24b79c7 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -20,12 +20,11 @@ package org.apache.hadoop.ozone.client.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -66,12 +65,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
private long currentBlockGroupLen = 0;
- /**
- * Defines stream action while calling handleFlushOrClose.
- */
- enum StreamAction {
- FLUSH, CLOSE, FULL
- }
public static final Logger LOG =
LoggerFactory.getLogger(KeyOutputStream.class);
@@ -413,22 +406,12 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
@Override
- public void flush() throws IOException {
- checkNotClosed();
- handleFlushOrClose(StreamAction.FLUSH);
+ public void flush() {
+ throw new NotImplementedException("The flush API is not implemented yet.");
}
- /**
- * Close or Flush the latest outputStream depending upon the action.
- * This function gets called when while write is going on, the current stream
- * gets full or explicit flush or close request is made by client.
- *
- * @param op Flag which decides whether to call close or flush on the
- * outputStream.
- * @throws IOException In case, flush or close fails with exception.
- */
- @SuppressWarnings("squid:S1141")
- private void handleFlushOrClose(StreamAction op) throws IOException {
+ private void closeCurrentStreamEntry()
+ throws IOException {
if (!blockOutputStreamEntryPool.isEmpty()) {
while (true) {
try {
@@ -436,7 +419,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
blockOutputStreamEntryPool.getCurrentStreamEntry();
if (entry != null) {
try {
- handleStreamAction(entry, op);
+ entry.close();
} catch (IOException ioe) {
handleException(entry, ioe);
continue;
@@ -451,58 +434,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
}
- private void handleFlushOrCloseAllStreams(StreamAction op)
- throws IOException {
- if (!blockOutputStreamEntryPool.isEmpty()) {
- List<BlockOutputStreamEntry> allStreamEntries =
- blockOutputStreamEntryPool.getStreamEntries();
- for (int i = 0; i < allStreamEntries.size(); i++) {
- while (true) {
- try {
- BlockOutputStreamEntry entry = allStreamEntries.get(i);
- if (entry != null) {
- try {
- handleStreamAction(entry, op);
- } catch (IOException ioe) {
- handleException(entry, ioe);
- continue;
- }
- }
- return;
- } catch (Exception e) {
- markStreamClosed();
- throw e;
- }
- }
- }
- }
- }
-
- private void handleStreamAction(BlockOutputStreamEntry entry, StreamAction
op)
- throws IOException {
- Collection<DatanodeDetails> failedServers = entry.getFailedServers();
- // failed servers can be null in case there is no data written in
- // the stream
- if (!failedServers.isEmpty()) {
- blockOutputStreamEntryPool.getExcludeList().addDatanodes(failedServers);
- }
- switch (op) {
- case CLOSE:
- entry.close();
- break;
- case FULL:
- if (entry.getRemaining() == 0) {
- entry.close();
- }
- break;
- case FLUSH:
- entry.flush();
- break;
- default:
- throw new IOException("Invalid Operation");
- }
- }
-
/**
* Commit the key to OM, this will add the blocks as the new key blocks.
*
@@ -546,7 +477,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
}
- handleFlushOrCloseAllStreams(StreamAction.CLOSE);
+ closeCurrentStreamEntry();
Preconditions.checkArgument(writeOffset == offset);
blockOutputStreamEntryPool.getCurrentStreamEntry().close();
blockOutputStreamEntryPool.commitKey(offset);
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 9b2bb3c..8cd4365 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.client;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -488,6 +489,21 @@ public class TestOzoneECClient {
}
}
+ @Test(expected = NotImplementedException.class)
+ public void testFlushShouldThrowNotImplementedException() throws IOException
{
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ try (OzoneOutputStream out = bucket.createKey(keyName, 1024 * 3,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize), new HashMap<>())) {
+ out.write(inputChunks[0]); // Just write some content.
+ out.flush();
+ }
+ }
+
private OzoneBucket writeIntoECKey(byte[] data, String key,
DefaultReplicationConfig defaultReplicationConfig) throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]