This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e74fccb8f8 HDDS-7907. [hsync] KeyOutputStream is not thread safe.
(#4601)
e74fccb8f8 is described below
commit e74fccb8f8bfbe18cfb7b6da61cfc3da98fd0421
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Mon Apr 24 17:09:55 2023 -0700
HDDS-7907. [hsync] KeyOutputStream is not thread safe. (#4601)
---
.../hadoop/ozone/client/io/KeyOutputStream.java | 15 +++--
.../java/org/apache/hadoop/fs/ozone/TestHSync.java | 77 ++++++++++++++++++++++
2 files changed, 85 insertions(+), 7 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 1425ab837b..c88c61c2f3 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -179,13 +179,13 @@ public class KeyOutputStream extends OutputStream
implements Syncable {
* @param openVersion the version corresponding to the pre-allocation.
* @throws IOException
*/
- public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+ public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version,
long openVersion) throws IOException {
blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
}
@Override
- public void write(int b) throws IOException {
+ public synchronized void write(int b) throws IOException {
byte[] buf = new byte[1];
buf[0] = (byte) b;
write(buf, 0, 1);
@@ -204,7 +204,7 @@ public class KeyOutputStream extends OutputStream
implements Syncable {
* @throws IOException
*/
@Override
- public void write(byte[] b, int off, int len)
+ public synchronized void write(byte[] b, int off, int len)
throws IOException {
checkNotClosed();
if (b == null) {
@@ -444,7 +444,7 @@ public class KeyOutputStream extends OutputStream
implements Syncable {
}
@Override
- public void flush() throws IOException {
+ public synchronized void flush() throws IOException {
checkNotClosed();
handleFlushOrClose(StreamAction.FLUSH);
}
@@ -455,7 +455,7 @@ public class KeyOutputStream extends OutputStream
implements Syncable {
}
@Override
- public void hsync() throws IOException {
+ public synchronized void hsync() throws IOException {
if (replication.getReplicationType() != ReplicationType.RATIS) {
throw new UnsupportedOperationException(
"Replication type is not " + ReplicationType.RATIS);
@@ -546,7 +546,7 @@ public class KeyOutputStream extends OutputStream
implements Syncable {
* @throws IOException
*/
@Override
- public void close() throws IOException {
+ public synchronized void close() throws IOException {
if (closed) {
return;
}
@@ -562,7 +562,8 @@ public class KeyOutputStream extends OutputStream
implements Syncable {
}
}
- public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+ public synchronized OmMultipartCommitUploadPartInfo
+ getCommitUploadPartInfo() {
return blockOutputStreamEntryPool.getCommitUploadPartInfo();
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 3bc207738e..5d973dfd1b 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.conf.StorageUnit;
@@ -52,6 +53,7 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.util.Time;
import org.apache.ozone.test.tag.Flaky;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -229,6 +231,81 @@ public class TestHSync {
assertEquals(data.length, offset);
}
+ private void runConcurrentWriteHSync(FileSystem fs, Path file,
+ final FSDataOutputStream out, int initialDataSize)
+ throws InterruptedException, IOException {
+ final byte[] data = new byte[initialDataSize];
+ ThreadLocalRandom.current().nextBytes(data);
+
+ AtomicReference<IOException> writerException = new AtomicReference<>();
+ AtomicReference<IOException> syncerException = new AtomicReference<>();
+
+ LOG.info("runConcurrentWriteHSync {} with size {}",
+ file, initialDataSize);
+
+ final long start = Time.monotonicNow();
+ // two threads: write and hsync
+ Runnable writer = () -> {
+ while ((Time.monotonicNow() - start < 10000)) {
+ try {
+ out.write(data);
+ } catch (IOException e) {
+ writerException.set(e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ Runnable syncer = () -> {
+ while ((Time.monotonicNow() - start < 10000)) {
+ try {
+ out.hsync();
+ } catch (IOException e) {
+ syncerException.set(e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ Thread writerThread = new Thread(writer);
+ writerThread.start();
+ Thread syncThread = new Thread(syncer);
+ syncThread.start();
+ writerThread.join();
+ syncThread.join();
+
+ if (writerException.get() != null) {
+ throw writerException.get();
+ }
+ if (syncerException.get() != null) {
+ throw syncerException.get();
+ }
+ }
+
+ @Test
+ public void testConcurrentWriteHSync()
+ throws IOException, InterruptedException {
+ final String rootPath = String.format("%s://%s/",
+ OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
+ CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+ final String dir = OZONE_ROOT + bucket.getVolumeName()
+ + OZONE_URI_DELIMITER + bucket.getName();
+
+ try (FileSystem fs = FileSystem.get(CONF)) {
+ for (int i = 0; i < 10; i++) {
+ final Path file = new Path(dir, "file" + i);
+ try (FSDataOutputStream out =
+ fs.create(file, true)) {
+ int initialDataSize = 1 << i;
+ runConcurrentWriteHSync(fs, file, out, initialDataSize);
+ }
+
+ fs.delete(file, false);
+ }
+ }
+ }
+
@Test
public void testStreamCapability() throws Exception {
final String rootPath = String.format("%s://%s/",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]