This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 97babb3047 HDDS-11184. [hsync] Add a client config to limit write
concurrency on the same key (#7073)
97babb3047 is described below
commit 97babb30474581577c75b1240b5063cf91f7b535
Author: Siyao Meng <[email protected]>
AuthorDate: Thu Aug 15 22:48:40 2024 -0700
HDDS-11184. [hsync] Add a client config to limit write concurrency on the
same key (#7073)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 16 +++
.../hadoop/ozone/client/io/KeyOutputStream.java | 111 +++++++++++-------
.../ozone/client/io/KeyOutputStreamSemaphore.java | 71 ++++++++++++
.../ozone/client/io/TestKeyOutputStream.java | 126 +++++++++++++++++++++
.../java/org/apache/hadoop/fs/ozone/TestHSync.java | 2 +
5 files changed, 286 insertions(+), 40 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index b130f48776..bbc52bfe4f 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -264,6 +264,14 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private boolean enablePutblockPiggybacking = false;
+ @Config(key = "key.write.concurrency",
+ defaultValue = "1",
+ description = "Maximum concurrent writes allowed on each key. " +
+ "Defaults to 1 which matches the behavior before HDDS-9844. " +
+ "For unlimited write concurrency, set this to -1 or any negative
integer value.",
+ tags = ConfigTag.CLIENT)
+ private int maxConcurrentWritePerKey = 1;
+
@PostConstruct
public void validate() {
Preconditions.checkState(streamBufferSize > 0);
@@ -485,4 +493,12 @@ public class OzoneClientConfig {
public boolean getIncrementalChunkList() {
return this.incrementalChunkList;
}
+
+ public void setMaxConcurrentWritePerKey(int maxConcurrentWritePerKey) {
+ this.maxConcurrentWritePerKey = maxConcurrentWritePerKey;
+ }
+
+ public int getMaxConcurrentWritePerKey() {
+ return this.maxConcurrentWritePerKey;
+ }
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 59c0fa134a..549607c59a 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -110,6 +110,13 @@ public class KeyOutputStream extends OutputStream
private ContainerClientMetrics clientMetrics;
private OzoneManagerVersion ozoneManagerVersion;
+ private final int maxConcurrentWritePerKey;
+ private final KeyOutputStreamSemaphore keyOutputStreamSemaphore;
+
+ KeyOutputStreamSemaphore getRequestSemaphore() {
+ return keyOutputStreamSemaphore;
+ }
+
public KeyOutputStream(ReplicationConfig replicationConfig,
BlockOutputStreamEntryPool blockOutputStreamEntryPool) {
this.replication = replicationConfig;
closed = false;
@@ -120,6 +127,10 @@ public class KeyOutputStream extends OutputStream
retryCount = 0;
offset = 0;
this.blockOutputStreamEntryPool = blockOutputStreamEntryPool;
+ // Force write concurrency to 1 per key when using this constructor.
+ // At the moment, this constructor is only used by ECKeyOutputStream.
+ this.maxConcurrentWritePerKey = 1;
+ this.keyOutputStreamSemaphore = new
KeyOutputStreamSemaphore(maxConcurrentWritePerKey);
}
protected BlockOutputStreamEntryPool getBlockOutputStreamEntryPool() {
@@ -150,6 +161,8 @@ public class KeyOutputStream extends OutputStream
this.replication = b.replicationConfig;
this.blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(b);
final OzoneClientConfig config = b.getClientConfig();
+ this.maxConcurrentWritePerKey = config.getMaxConcurrentWritePerKey();
+ this.keyOutputStreamSemaphore = new
KeyOutputStreamSemaphore(maxConcurrentWritePerKey);
this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
config.getMaxRetryCount(), config.getRetryInterval());
this.retryCount = 0;
@@ -179,7 +192,7 @@ public class KeyOutputStream extends OutputStream
}
@Override
- public synchronized void write(int b) throws IOException {
+ public void write(int b) throws IOException {
byte[] buf = new byte[1];
buf[0] = (byte) b;
write(buf, 0, 1);
@@ -200,24 +213,31 @@ public class KeyOutputStream extends OutputStream
@Override
public void write(byte[] b, int off, int len)
throws IOException {
- checkNotClosed();
- if (b == null) {
- throw new NullPointerException();
- }
- if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
- || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException();
- }
- if (len == 0) {
- return;
- }
- synchronized (this) {
- handleWrite(b, off, len, false);
- writeOffset += len;
+ try {
+ getRequestSemaphore().acquire();
+ checkNotClosed();
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) >
b.length)
+ || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (len == 0) {
+ return;
+ }
+
+ synchronized (this) {
+ handleWrite(b, off, len, false);
+ writeOffset += len;
+ }
+ } finally {
+ getRequestSemaphore().release();
}
}
- private void handleWrite(byte[] b, int off, long len, boolean retry)
+ @VisibleForTesting
+ void handleWrite(byte[] b, int off, long len, boolean retry)
throws IOException {
while (len > 0) {
try {
@@ -441,40 +461,51 @@ public class KeyOutputStream extends OutputStream
@Override
public void flush() throws IOException {
- checkNotClosed();
- handleFlushOrClose(StreamAction.FLUSH);
+ try {
+ getRequestSemaphore().acquire();
+ checkNotClosed();
+ handleFlushOrClose(StreamAction.FLUSH);
+ } finally {
+ getRequestSemaphore().release();
+ }
}
@Override
public void hflush() throws IOException {
+ // Note: Semaphore acquired and released inside hsync().
hsync();
}
@Override
public void hsync() throws IOException {
- if (replication.getReplicationType() != ReplicationType.RATIS) {
- throw new UnsupportedOperationException(
- "Replication type is not " + ReplicationType.RATIS);
- }
- if (replication.getRequiredNodes() <= 1) {
- throw new UnsupportedOperationException("The replication factor = "
- + replication.getRequiredNodes() + " <= 1");
- }
- if (ozoneManagerVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
- throw new UnsupportedOperationException("Hsync API requires OM version "
- + OzoneManagerVersion.HBASE_SUPPORT + " or later. Current OM version
"
- + ozoneManagerVersion);
- }
- checkNotClosed();
- final long hsyncPos = writeOffset;
-
- handleFlushOrClose(StreamAction.HSYNC);
+ try {
+ getRequestSemaphore().acquire();
- synchronized (this) {
- Preconditions.checkState(offset >= hsyncPos,
- "offset = %s < hsyncPos = %s", offset, hsyncPos);
- MetricUtil.captureLatencyNs(clientMetrics::addHsyncLatency,
- () -> blockOutputStreamEntryPool.hsyncKey(hsyncPos));
+ if (replication.getReplicationType() != ReplicationType.RATIS) {
+ throw new UnsupportedOperationException(
+ "Replication type is not " + ReplicationType.RATIS);
+ }
+ if (replication.getRequiredNodes() <= 1) {
+ throw new UnsupportedOperationException("The replication factor = "
+ + replication.getRequiredNodes() + " <= 1");
+ }
+ if (ozoneManagerVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) <
0) {
+ throw new UnsupportedOperationException("Hsync API requires OM version
"
+ + OzoneManagerVersion.HBASE_SUPPORT + " or later. Current OM
version "
+ + ozoneManagerVersion);
+ }
+ checkNotClosed();
+ final long hsyncPos = writeOffset;
+ handleFlushOrClose(StreamAction.HSYNC);
+
+ synchronized (this) {
+ Preconditions.checkState(offset >= hsyncPos,
+ "offset = %s < hsyncPos = %s", offset, hsyncPos);
+ MetricUtil.captureLatencyNs(clientMetrics::addHsyncLatency,
+ () -> blockOutputStreamEntryPool.hsyncKey(hsyncPos));
+ }
+ } finally {
+ getRequestSemaphore().release();
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStreamSemaphore.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStreamSemaphore.java
new file mode 100644
index 0000000000..36031a9cf4
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStreamSemaphore.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.Semaphore;
+
+/**
+ * Helper class that streamlines request semaphore usage in KeyOutputStream.
+ */
+public class KeyOutputStreamSemaphore {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(KeyOutputStreamSemaphore.class);
+ private final Semaphore requestSemaphore;
+
+ KeyOutputStreamSemaphore(int maxConcurrentWritePerKey) {
+ LOG.info("Initializing semaphore with maxConcurrentWritePerKey = {}",
maxConcurrentWritePerKey);
+ if (maxConcurrentWritePerKey > 0) {
+ requestSemaphore = new Semaphore(maxConcurrentWritePerKey);
+ } else if (maxConcurrentWritePerKey == 0) {
+ throw new IllegalArgumentException("Invalid config.
ozone.client.key.write.concurrency cannot be set to 0");
+ } else {
+ requestSemaphore = null;
+ }
+ }
+
+ public int getQueueLength() {
+ return requestSemaphore != null ? requestSemaphore.getQueueLength() : 0;
+ }
+
+ public void acquire() throws IOException {
+ if (requestSemaphore != null) {
+ try {
+ LOG.debug("Acquiring semaphore");
+ requestSemaphore.acquire();
+ LOG.debug("Acquired semaphore");
+ } catch (InterruptedException e) {
+ final String errMsg = "Write aborted. Interrupted waiting for
KeyOutputStream semaphore: " + e.getMessage();
+ LOG.error(errMsg);
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException(errMsg);
+ }
+ }
+ }
+
+ public void release() {
+ if (requestSemaphore != null) {
+ LOG.debug("Releasing semaphore");
+ requestSemaphore.release();
+ LOG.debug("Released semaphore");
+ }
+ }
+}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
new file mode 100644
index 0000000000..6b6abceff3
--- /dev/null
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests KeyOutputStream.
+ * This is a unit test meant to verify specific behaviors of KeyOutputStream.
+ */
+public class TestKeyOutputStream {
+
+ @BeforeAll
+ static void init() {
+ GenericTestUtils.setLogLevel(KeyOutputStreamSemaphore.LOG, Level.DEBUG);
+ }
+
+ @Test
+ void testConcurrentWriteLimitOne() throws Exception {
+ // Verify the semaphore is working to limit the number of concurrent
writes allowed.
+ KeyOutputStreamSemaphore sema1 = new KeyOutputStreamSemaphore(1);
+ KeyOutputStream keyOutputStream = mock(KeyOutputStream.class);
+ when(keyOutputStream.getRequestSemaphore()).thenReturn(sema1);
+
+ final AtomicInteger countWrite = new AtomicInteger(0);
+ // mock write()
+ doAnswer(invocation -> {
+ countWrite.getAndIncrement();
+ return invocation.callRealMethod();
+ }).when(keyOutputStream).write(any(), anyInt(), anyInt());
+
+ final ConcurrentHashMap<Long, CountDownLatch> mapNotifiers = new
ConcurrentHashMap<>();
+
+ final AtomicInteger countHandleWrite = new AtomicInteger(0);
+ // mock handleWrite()
+ doAnswer(invocation -> {
+ final long tid = Thread.currentThread().getId();
+ System.out.println("handleWrite() called from tid " + tid);
+ final CountDownLatch latch = mapNotifiers.compute(tid, (k, v) ->
+ v != null ? v : new CountDownLatch(1));
+ countHandleWrite.getAndIncrement();
+ // doing some "work"
+ latch.await();
+ return null;
+ }).when(keyOutputStream).handleWrite(any(), anyInt(), anyLong(),
anyBoolean());
+
+ final Runnable writeRunnable = () -> {
+ try {
+ keyOutputStream.write(new byte[4], 0, 4);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ final Thread thread1 = new Thread(writeRunnable);
+ thread1.start();
+
+ final Thread thread2 = new Thread(writeRunnable);
+ thread2.start();
+
+ // Wait for both threads to enter write()
+ GenericTestUtils.waitFor(() -> countWrite.get() == 2, 100, 3000);
+ // One thread should enter handleWrite()
+ GenericTestUtils.waitFor(() -> countHandleWrite.get() == 1, 100, 3000);
+ // The other thread is waiting on the semaphore
+ GenericTestUtils.waitFor(() -> sema1.getQueueLength() == 1, 100, 3000);
+
+ // handleWrite is triggered only once because of the semaphore and the
synchronized block
+ verify(keyOutputStream, times(1)).handleWrite(any(), anyInt(), anyLong(),
anyBoolean());
+
+ // Now, allow the current thread to finish handleWrite
+ // There is only one thread in handleWrite() so mapNotifiers should have
only one entry.
+ assertEquals(1, mapNotifiers.size());
+ Entry<Long, CountDownLatch> entry =
mapNotifiers.entrySet().stream().findFirst().get();
+ mapNotifiers.remove(entry.getKey());
+ entry.getValue().countDown();
+
+ // Wait for the other thread to proceed
+ GenericTestUtils.waitFor(() -> countHandleWrite.get() == 2, 100, 3000);
+ verify(keyOutputStream, times(2)).handleWrite(any(), anyInt(), anyLong(),
anyBoolean());
+
+ // Allow the other thread to finish handleWrite
+ entry = mapNotifiers.entrySet().stream().findFirst().get();
+ mapNotifiers.remove(entry.getKey());
+ entry.getValue().countDown();
+
+ // Let threads finish
+ thread2.join();
+ thread1.join();
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 98d7388310..91792da76f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -175,6 +175,8 @@ public class TestHSync {
CONF.setTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL, 100,
TimeUnit.MILLISECONDS);
CONF.setBoolean("ozone.client.incremental.chunk.list", true);
CONF.setBoolean("ozone.client.stream.putblock.piggybacking", true);
+ // Unlimited key write concurrency
+ CONF.setInt("ozone.client.key.write.concurrency", -1);
CONF.setTimeDuration(OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL,
SERVICE_INTERVAL, TimeUnit.MILLISECONDS);
CONF.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]