This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 8315f08791463eb6a2dc3d5fcd409156ff3e9a13 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Thu Jun 9 23:40:02 2022 -0700 HDDS-6842. [Ozone-Streaming] Reduce the number of watch requests in StreamCommitWatcher. (#3492) --- .../hdds/scm/storage/AbstractDataStreamOutput.java | 3 +- .../hdds/scm/storage/StreamCommitWatcher.java | 39 +++++++++++++--------- .../ozone/freon/OzoneClientKeyGenerator.java | 12 +++---- 3 files changed, 32 insertions(+), 22 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java index e29670d781..cad1d04792 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java @@ -29,6 +29,7 @@ import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; import java.io.IOException; import java.io.InterruptedIOException; import java.util.Map; +import java.util.Objects; /** * This class is used for error handling methods. @@ -111,7 +112,7 @@ public abstract class AbstractDataStreamOutput if (Thread.currentThread().isInterrupted()) { setExceptionAndThrow(exception); } - Preconditions.checkNotNull(action); + Objects.requireNonNull(action); Preconditions.checkArgument( action.action == RetryPolicy.RetryAction.RetryDecision.RETRY); if (action.delayMillis > 0) { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java index 1820416d32..8ca70de816 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java @@ -16,17 +16,13 @@ * limitations under the License. */ -/** - * This class maintains the map of the commitIndexes to be watched for - * successful replication in the datanodes in a given pipeline. It also releases - * the buffers associated with the user data back to {@Link BufferPool} once - * minimum replication criteria is achieved during an ozone key write. - */ package org.apache.hadoop.hdds.scm.storage; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.MemoizedSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +30,9 @@ import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -49,13 +48,15 @@ public class StreamCommitWatcher { LoggerFactory.getLogger(StreamCommitWatcher.class); private Map<Long, List<StreamBuffer>> commitIndexMap; - private List<StreamBuffer> bufferList; + private final List<StreamBuffer> bufferList; // total data which has been successfully flushed and acknowledged // by all servers private long totalAckDataLength; + private final ConcurrentMap<Long, CompletableFuture<XceiverClientReply>> + replies = new ConcurrentHashMap<>(); - private XceiverClientSpi xceiverClient; + private final XceiverClientSpi xceiverClient; public StreamCommitWatcher(XceiverClientSpi xceiverClient, List<StreamBuffer> bufferList) { @@ -130,16 +131,24 @@ public class StreamCommitWatcher { */ public XceiverClientReply streamWatchForCommit(long commitIndex) throws IOException { - final long index; + final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier + = JavaUtils.memoize(CompletableFuture::new); + final CompletableFuture<XceiverClientReply> f = replies.compute(commitIndex, + (key, value) -> value != null ? value : supplier.get()); + if (!supplier.isInitialized()) { + // future already exists + return f.join(); + } + try { XceiverClientReply reply = xceiverClient.watchForCommit(commitIndex); - if (reply == null) { - index = 0; - } else { - index = reply.getLogIndex(); - } - adjustBuffers(index); + f.complete(reply); + final CompletableFuture<XceiverClientReply> removed + = replies.remove(commitIndex); + Preconditions.checkState(removed == f); + + adjustBuffers(reply.getLogIndex()); return reply; } catch (InterruptedException e) { // Re-interrupt the thread while catching InterruptedException diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java index 6ab5c03009..43cdfcfc50 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java @@ -24,7 +24,8 @@ import java.util.concurrent.Callable; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; @@ -132,14 +133,13 @@ public class OzoneClientKeyGenerator extends BaseFreonGenerator } private void createStreamKey(long counter) throws Exception { - final ReplicationConfig replicationConfig = ReplicationConfig - .fromProtoTypeAndFactor(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE); + final ReplicationConfig conf = ReplicationConfig.fromProtoTypeAndFactor( + ReplicationType.RATIS, ReplicationFactor.THREE); final String key = generateObjectName(counter); timer.time(() -> { - try (OzoneDataStreamOutput stream = bucket - .createStreamKey(key, keySize, replicationConfig, metadata)) { + try (OzoneDataStreamOutput stream = bucket.createStreamKey( + key, keySize, conf, metadata)) { contentGenerator.write(stream); } return null; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
