This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e74fccb8f8 HDDS-7907. [hsync] KeyOutputStream is not thread safe. 
(#4601)
e74fccb8f8 is described below

commit e74fccb8f8bfbe18cfb7b6da61cfc3da98fd0421
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Mon Apr 24 17:09:55 2023 -0700

    HDDS-7907. [hsync] KeyOutputStream is not thread safe. (#4601)
---
 .../hadoop/ozone/client/io/KeyOutputStream.java    | 15 +++--
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java | 77 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 7 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 1425ab837b..c88c61c2f3 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -179,13 +179,13 @@ public class KeyOutputStream extends OutputStream 
implements Syncable {
    * @param openVersion the version corresponding to the pre-allocation.
    * @throws IOException
    */
-  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+  public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version,
       long openVersion) throws IOException {
     blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
   }
 
   @Override
-  public void write(int b) throws IOException {
+  public synchronized void write(int b) throws IOException {
     byte[] buf = new byte[1];
     buf[0] = (byte) b;
     write(buf, 0, 1);
@@ -204,7 +204,7 @@ public class KeyOutputStream extends OutputStream 
implements Syncable {
    * @throws IOException
    */
   @Override
-  public void write(byte[] b, int off, int len)
+  public synchronized void write(byte[] b, int off, int len)
       throws IOException {
     checkNotClosed();
     if (b == null) {
@@ -444,7 +444,7 @@ public class KeyOutputStream extends OutputStream 
implements Syncable {
   }
 
   @Override
-  public void flush() throws IOException {
+  public synchronized void flush() throws IOException {
     checkNotClosed();
     handleFlushOrClose(StreamAction.FLUSH);
   }
@@ -455,7 +455,7 @@ public class KeyOutputStream extends OutputStream 
implements Syncable {
   }
 
   @Override
-  public void hsync() throws IOException {
+  public synchronized void hsync() throws IOException {
     if (replication.getReplicationType() != ReplicationType.RATIS) {
       throw new UnsupportedOperationException(
           "Replication type is not " + ReplicationType.RATIS);
@@ -546,7 +546,7 @@ public class KeyOutputStream extends OutputStream 
implements Syncable {
    * @throws IOException
    */
   @Override
-  public void close() throws IOException {
+  public synchronized void close() throws IOException {
     if (closed) {
       return;
     }
@@ -562,7 +562,8 @@ public class KeyOutputStream extends OutputStream 
implements Syncable {
     }
   }
 
-  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+  public synchronized OmMultipartCommitUploadPartInfo
+      getCommitUploadPartInfo() {
     return blockOutputStreamEntryPool.getCommitUploadPartInfo();
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 3bc207738e..5d973dfd1b 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.conf.StorageUnit;
@@ -52,6 +53,7 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 
+import org.apache.hadoop.util.Time;
 import org.apache.ozone.test.tag.Flaky;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -229,6 +231,81 @@ public class TestHSync {
     assertEquals(data.length, offset);
   }
 
+  private void runConcurrentWriteHSync(FileSystem fs, Path file,
+      final FSDataOutputStream out, int initialDataSize)
+      throws InterruptedException, IOException {
+    final byte[] data = new byte[initialDataSize];
+    ThreadLocalRandom.current().nextBytes(data);
+
+    AtomicReference<IOException> writerException = new AtomicReference<>();
+    AtomicReference<IOException> syncerException = new AtomicReference<>();
+
+    LOG.info("runConcurrentWriteHSync {} with size {}",
+        file, initialDataSize);
+
+    final long start = Time.monotonicNow();
+    // two threads: write and hsync
+    Runnable writer = () -> {
+      while ((Time.monotonicNow() - start < 10000)) {
+        try {
+          out.write(data);
+        } catch (IOException e) {
+          writerException.set(e);
+          throw new RuntimeException(e);
+        }
+      }
+    };
+
+    Runnable syncer = () -> {
+      while ((Time.monotonicNow() - start < 10000)) {
+        try {
+          out.hsync();
+        } catch (IOException e) {
+          syncerException.set(e);
+          throw new RuntimeException(e);
+        }
+      }
+    };
+
+    Thread writerThread = new Thread(writer);
+    writerThread.start();
+    Thread syncThread = new Thread(syncer);
+    syncThread.start();
+    writerThread.join();
+    syncThread.join();
+
+    if (writerException.get() != null) {
+      throw writerException.get();
+    }
+    if (syncerException.get() != null) {
+      throw syncerException.get();
+    }
+  }
+
+  @Test
+  public void testConcurrentWriteHSync()
+      throws IOException, InterruptedException {
+    final String rootPath = String.format("%s://%s/",
+        OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
+    CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+    final String dir = OZONE_ROOT + bucket.getVolumeName()
+        + OZONE_URI_DELIMITER + bucket.getName();
+
+    try (FileSystem fs = FileSystem.get(CONF)) {
+      for (int i = 0; i < 10; i++) {
+        final Path file = new Path(dir, "file" + i);
+        try (FSDataOutputStream out =
+            fs.create(file, true)) {
+          int initialDataSize = 1 << i;
+          runConcurrentWriteHSync(fs, file, out, initialDataSize);
+        }
+
+        fs.delete(file, false);
+      }
+    }
+  }
+
   @Test
   public void testStreamCapability() throws Exception {
     final String rootPath = String.format("%s://%s/",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to