This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 97babb3047 HDDS-11184. [hsync] Add a client config to limit write 
concurrency on the same key (#7073)
97babb3047 is described below

commit 97babb30474581577c75b1240b5063cf91f7b535
Author: Siyao Meng <[email protected]>
AuthorDate: Thu Aug 15 22:48:40 2024 -0700

    HDDS-11184. [hsync] Add a client config to limit write concurrency on the 
same key (#7073)
---
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  |  16 +++
 .../hadoop/ozone/client/io/KeyOutputStream.java    | 111 +++++++++++-------
 .../ozone/client/io/KeyOutputStreamSemaphore.java  |  71 ++++++++++++
 .../ozone/client/io/TestKeyOutputStream.java       | 126 +++++++++++++++++++++
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java |   2 +
 5 files changed, 286 insertions(+), 40 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index b130f48776..bbc52bfe4f 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -264,6 +264,14 @@ public class OzoneClientConfig {
           tags = ConfigTag.CLIENT)
   private boolean enablePutblockPiggybacking = false;
 
+  @Config(key = "key.write.concurrency",
+      defaultValue = "1",
+      description = "Maximum concurrent writes allowed on each key. " +
+          "Defaults to 1 which matches the behavior before HDDS-9844. " +
+          "For unlimited write concurrency, set this to -1 or any negative 
integer value.",
+      tags = ConfigTag.CLIENT)
+  private int maxConcurrentWritePerKey = 1;
+
   @PostConstruct
   public void validate() {
     Preconditions.checkState(streamBufferSize > 0);
@@ -485,4 +493,12 @@ public class OzoneClientConfig {
   public boolean getIncrementalChunkList() {
     return this.incrementalChunkList;
   }
+
+  public void setMaxConcurrentWritePerKey(int maxConcurrentWritePerKey) {
+    this.maxConcurrentWritePerKey = maxConcurrentWritePerKey;
+  }
+
+  public int getMaxConcurrentWritePerKey() {
+    return this.maxConcurrentWritePerKey;
+  }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 59c0fa134a..549607c59a 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -110,6 +110,13 @@ public class KeyOutputStream extends OutputStream
   private ContainerClientMetrics clientMetrics;
   private OzoneManagerVersion ozoneManagerVersion;
 
+  private final int maxConcurrentWritePerKey;
+  private final KeyOutputStreamSemaphore keyOutputStreamSemaphore;
+
+  KeyOutputStreamSemaphore getRequestSemaphore() {
+    return keyOutputStreamSemaphore;
+  }
+
   public KeyOutputStream(ReplicationConfig replicationConfig, 
BlockOutputStreamEntryPool blockOutputStreamEntryPool) {
     this.replication = replicationConfig;
     closed = false;
@@ -120,6 +127,10 @@ public class KeyOutputStream extends OutputStream
     retryCount = 0;
     offset = 0;
     this.blockOutputStreamEntryPool = blockOutputStreamEntryPool;
+    // Force write concurrency to 1 per key when using this constructor.
+    // At the moment, this constructor is only used by ECKeyOutputStream.
+    this.maxConcurrentWritePerKey = 1;
+    this.keyOutputStreamSemaphore = new 
KeyOutputStreamSemaphore(maxConcurrentWritePerKey);
   }
 
   protected BlockOutputStreamEntryPool getBlockOutputStreamEntryPool() {
@@ -150,6 +161,8 @@ public class KeyOutputStream extends OutputStream
     this.replication = b.replicationConfig;
     this.blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(b);
     final OzoneClientConfig config = b.getClientConfig();
+    this.maxConcurrentWritePerKey = config.getMaxConcurrentWritePerKey();
+    this.keyOutputStreamSemaphore = new 
KeyOutputStreamSemaphore(maxConcurrentWritePerKey);
     this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
         config.getMaxRetryCount(), config.getRetryInterval());
     this.retryCount = 0;
@@ -179,7 +192,7 @@ public class KeyOutputStream extends OutputStream
   }
 
   @Override
-  public synchronized void write(int b) throws IOException {
+  public void write(int b) throws IOException {
     byte[] buf = new byte[1];
     buf[0] = (byte) b;
     write(buf, 0, 1);
@@ -200,24 +213,31 @@ public class KeyOutputStream extends OutputStream
   @Override
   public void write(byte[] b, int off, int len)
       throws IOException {
-    checkNotClosed();
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
-        || ((off + len) < 0)) {
-      throw new IndexOutOfBoundsException();
-    }
-    if (len == 0) {
-      return;
-    }
-    synchronized (this) {
-      handleWrite(b, off, len, false);
-      writeOffset += len;
+    try {
+      getRequestSemaphore().acquire();
+      checkNotClosed();
+      if (b == null) {
+        throw new NullPointerException();
+      }
+      if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > 
b.length)
+          || ((off + len) < 0)) {
+        throw new IndexOutOfBoundsException();
+      }
+      if (len == 0) {
+        return;
+      }
+
+      synchronized (this) {
+        handleWrite(b, off, len, false);
+        writeOffset += len;
+      }
+    } finally {
+      getRequestSemaphore().release();
     }
   }
 
-  private void handleWrite(byte[] b, int off, long len, boolean retry)
+  @VisibleForTesting
+  void handleWrite(byte[] b, int off, long len, boolean retry)
       throws IOException {
     while (len > 0) {
       try {
@@ -441,40 +461,51 @@ public class KeyOutputStream extends OutputStream
 
   @Override
   public void flush() throws IOException {
-    checkNotClosed();
-    handleFlushOrClose(StreamAction.FLUSH);
+    try {
+      getRequestSemaphore().acquire();
+      checkNotClosed();
+      handleFlushOrClose(StreamAction.FLUSH);
+    } finally {
+      getRequestSemaphore().release();
+    }
   }
 
   @Override
   public void hflush() throws IOException {
+    // Note: Semaphore acquired and released inside hsync().
     hsync();
   }
 
   @Override
   public void hsync() throws IOException {
-    if (replication.getReplicationType() != ReplicationType.RATIS) {
-      throw new UnsupportedOperationException(
-          "Replication type is not " + ReplicationType.RATIS);
-    }
-    if (replication.getRequiredNodes() <= 1) {
-      throw new UnsupportedOperationException("The replication factor = "
-          + replication.getRequiredNodes() + " <= 1");
-    }
-    if (ozoneManagerVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
-      throw new UnsupportedOperationException("Hsync API requires OM version "
-          + OzoneManagerVersion.HBASE_SUPPORT + " or later. Current OM version 
"
-          + ozoneManagerVersion);
-    }
-    checkNotClosed();
-    final long hsyncPos = writeOffset;
-
-    handleFlushOrClose(StreamAction.HSYNC);
+    try {
+      getRequestSemaphore().acquire();
 
-    synchronized (this) {
-      Preconditions.checkState(offset >= hsyncPos,
-          "offset = %s < hsyncPos = %s", offset, hsyncPos);
-      MetricUtil.captureLatencyNs(clientMetrics::addHsyncLatency,
-          () -> blockOutputStreamEntryPool.hsyncKey(hsyncPos));
+      if (replication.getReplicationType() != ReplicationType.RATIS) {
+        throw new UnsupportedOperationException(
+            "Replication type is not " + ReplicationType.RATIS);
+      }
+      if (replication.getRequiredNodes() <= 1) {
+        throw new UnsupportedOperationException("The replication factor = "
+            + replication.getRequiredNodes() + " <= 1");
+      }
+      if (ozoneManagerVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 
0) {
+        throw new UnsupportedOperationException("Hsync API requires OM version 
"
+            + OzoneManagerVersion.HBASE_SUPPORT + " or later. Current OM 
version "
+            + ozoneManagerVersion);
+      }
+      checkNotClosed();
+      final long hsyncPos = writeOffset;
+      handleFlushOrClose(StreamAction.HSYNC);
+
+      synchronized (this) {
+        Preconditions.checkState(offset >= hsyncPos,
+            "offset = %s < hsyncPos = %s", offset, hsyncPos);
+        MetricUtil.captureLatencyNs(clientMetrics::addHsyncLatency,
+            () -> blockOutputStreamEntryPool.hsyncKey(hsyncPos));
+      }
+    } finally {
+      getRequestSemaphore().release();
     }
   }
 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStreamSemaphore.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStreamSemaphore.java
new file mode 100644
index 0000000000..36031a9cf4
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStreamSemaphore.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.Semaphore;
+
+/**
+ * Helper class that streamlines request semaphore usage in KeyOutputStream.
+ */
+public class KeyOutputStreamSemaphore {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(KeyOutputStreamSemaphore.class);
+  private final Semaphore requestSemaphore;
+
+  KeyOutputStreamSemaphore(int maxConcurrentWritePerKey) {
+    LOG.info("Initializing semaphore with maxConcurrentWritePerKey = {}", 
maxConcurrentWritePerKey);
+    if (maxConcurrentWritePerKey > 0) {
+      requestSemaphore = new Semaphore(maxConcurrentWritePerKey);
+    } else if (maxConcurrentWritePerKey == 0) {
+      throw new IllegalArgumentException("Invalid config. 
ozone.client.key.write.concurrency cannot be set to 0");
+    } else {
+      requestSemaphore = null;
+    }
+  }
+
+  public int getQueueLength() {
+    return requestSemaphore != null ? requestSemaphore.getQueueLength() : 0;
+  }
+
+  public void acquire() throws IOException {
+    if (requestSemaphore != null) {
+      try {
+        LOG.debug("Acquiring semaphore");
+        requestSemaphore.acquire();
+        LOG.debug("Acquired semaphore");
+      } catch (InterruptedException e) {
+        final String errMsg = "Write aborted. Interrupted waiting for 
KeyOutputStream semaphore: " + e.getMessage();
+        LOG.error(errMsg);
+        Thread.currentThread().interrupt();
+        throw new InterruptedIOException(errMsg);
+      }
+    }
+  }
+
+  public void release() {
+    if (requestSemaphore != null) {
+      LOG.debug("Releasing semaphore");
+      requestSemaphore.release();
+      LOG.debug("Released semaphore");
+    }
+  }
+}
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
new file mode 100644
index 0000000000..6b6abceff3
--- /dev/null
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests KeyOutputStream.
+ * This is a unit test meant to verify specific behaviors of KeyOutputStream.
+ */
+public class TestKeyOutputStream {
+
+  @BeforeAll
+  static void init() {
+    GenericTestUtils.setLogLevel(KeyOutputStreamSemaphore.LOG, Level.DEBUG);
+  }
+
+  @Test
+  void testConcurrentWriteLimitOne() throws Exception {
+    // Verify the semaphore is working to limit the number of concurrent 
writes allowed.
+    KeyOutputStreamSemaphore sema1 = new KeyOutputStreamSemaphore(1);
+    KeyOutputStream keyOutputStream = mock(KeyOutputStream.class);
+    when(keyOutputStream.getRequestSemaphore()).thenReturn(sema1);
+
+    final AtomicInteger countWrite = new AtomicInteger(0);
+    // mock write()
+    doAnswer(invocation -> {
+      countWrite.getAndIncrement();
+      return invocation.callRealMethod();
+    }).when(keyOutputStream).write(any(), anyInt(), anyInt());
+
+    final ConcurrentHashMap<Long, CountDownLatch> mapNotifiers = new 
ConcurrentHashMap<>();
+
+    final AtomicInteger countHandleWrite = new AtomicInteger(0);
+    // mock handleWrite()
+    doAnswer(invocation -> {
+      final long tid = Thread.currentThread().getId();
+      System.out.println("handleWrite() called from tid " + tid);
+      final CountDownLatch latch = mapNotifiers.compute(tid, (k, v) ->
+          v != null ? v : new CountDownLatch(1));
+      countHandleWrite.getAndIncrement();
+      // doing some "work"
+      latch.await();
+      return null;
+    }).when(keyOutputStream).handleWrite(any(), anyInt(), anyLong(), 
anyBoolean());
+
+    final Runnable writeRunnable = () -> {
+      try {
+        keyOutputStream.write(new byte[4], 0, 4);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    };
+
+    final Thread thread1 = new Thread(writeRunnable);
+    thread1.start();
+
+    final Thread thread2 = new Thread(writeRunnable);
+    thread2.start();
+
+    // Wait for both threads to enter write()
+    GenericTestUtils.waitFor(() -> countWrite.get() == 2, 100, 3000);
+    // One thread should enter handleWrite()
+    GenericTestUtils.waitFor(() -> countHandleWrite.get() == 1, 100, 3000);
+    // The other thread is waiting on the semaphore
+    GenericTestUtils.waitFor(() -> sema1.getQueueLength() == 1, 100, 3000);
+
+    // handleWrite is triggered only once because of the semaphore and the 
synchronized block
+    verify(keyOutputStream, times(1)).handleWrite(any(), anyInt(), anyLong(), 
anyBoolean());
+
+    // Now, allow the current thread to finish handleWrite
+    // There is only one thread in handleWrite() so mapNotifiers should have 
only one entry.
+    assertEquals(1, mapNotifiers.size());
+    Entry<Long, CountDownLatch> entry = 
mapNotifiers.entrySet().stream().findFirst().get();
+    mapNotifiers.remove(entry.getKey());
+    entry.getValue().countDown();
+
+    // Wait for the other thread to proceed
+    GenericTestUtils.waitFor(() -> countHandleWrite.get() == 2, 100, 3000);
+    verify(keyOutputStream, times(2)).handleWrite(any(), anyInt(), anyLong(), 
anyBoolean());
+
+    // Allow the other thread to finish handleWrite
+    entry = mapNotifiers.entrySet().stream().findFirst().get();
+    mapNotifiers.remove(entry.getKey());
+    entry.getValue().countDown();
+
+    // Let threads finish
+    thread2.join();
+    thread1.join();
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 98d7388310..91792da76f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -175,6 +175,8 @@ public class TestHSync {
     CONF.setTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL, 100, 
TimeUnit.MILLISECONDS);
     CONF.setBoolean("ozone.client.incremental.chunk.list", true);
     CONF.setBoolean("ozone.client.stream.putblock.piggybacking", true);
+    // Unlimited key write concurrency
+    CONF.setInt("ozone.client.key.write.concurrency", -1);
     CONF.setTimeDuration(OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL,
         SERVICE_INTERVAL, TimeUnit.MILLISECONDS);
     CONF.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to