This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 885d341dc46 HDDS-14571. Remove synchronized methods from
XceiverClientGrpc (#9718)
885d341dc46 is described below
commit 885d341dc463cfd377f766df2be8ec9b28a153b3
Author: Rishabh Patel <[email protected]>
AuthorDate: Mon Feb 23 22:56:54 2026 -0800
HDDS-14571. Remove synchronized methods from XceiverClientGrpc (#9718)
---
.../apache/hadoop/hdds/scm/XceiverClientGrpc.java | 109 ++++++++++++++-------
1 file changed, 76 insertions(+), 33 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 6cd258c5ba3..7c639e766eb 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -34,6 +34,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
@@ -84,8 +86,9 @@
* how it works, and how it is integrated with the Ozone client.
*/
public class XceiverClientGrpc extends XceiverClientSpi {
- private static final Logger LOG =
- LoggerFactory.getLogger(XceiverClientGrpc.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(XceiverClientGrpc.class);
+ private static final int SHUTDOWN_WAIT_INTERVAL_MILLIS = 100;
+ private static final int SHUTDOWN_WAIT_MAX_SECONDS = 5;
private final Pipeline pipeline;
private final ConfigurationSource config;
private final Map<DatanodeID, XceiverClientProtocolServiceStub> asyncStubs;
@@ -100,7 +103,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
// command can be sent to the same DN.
private final Map<DatanodeBlockID, DatanodeDetails> getBlockDNcache;
- private boolean closed = false;
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
/**
* Constructs a client that can communicate with the Container framework on
@@ -115,17 +118,16 @@ public XceiverClientGrpc(Pipeline pipeline,
ConfigurationSource config,
super();
Objects.requireNonNull(pipeline, "pipeline == null");
Objects.requireNonNull(config, "config == null");
- setTimeout(config.getTimeDuration(OzoneConfigKeys.
- OZONE_CLIENT_READ_TIMEOUT, OzoneConfigKeys
- .OZONE_CLIENT_READ_TIMEOUT_DEFAULT, TimeUnit.SECONDS));
+
setTimeout(config.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT,
+ OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT_DEFAULT, TimeUnit.SECONDS));
this.pipeline = pipeline;
this.config = config;
this.secConfig = new SecurityConfig(config);
this.semaphore =
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
this.metrics = XceiverClientManager.getXceiverClientMetrics();
- this.channels = new HashMap<>();
- this.asyncStubs = new HashMap<>();
+ this.channels = new ConcurrentHashMap<>();
+ this.asyncStubs = new ConcurrentHashMap<>();
this.topologyAwareRead = config.getBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
@@ -152,34 +154,56 @@ public XceiverClientGrpc(Pipeline pipeline,
ConfigurationSource config) {
public void connect() throws Exception {
// connect to the closest node, if closest node doesn't exist, delegate to
// first node, which is usually the leader in the pipeline.
- DatanodeDetails dn = topologyAwareRead ? this.pipeline.getClosestNode() :
- this.pipeline.getFirstNode();
+ DatanodeDetails dn = topologyAwareRead
+ ? this.pipeline.getClosestNode()
+ : this.pipeline.getFirstNode();
// just make a connection to the picked datanode at the beginning
connectToDatanode(dn);
}
- private synchronized void connectToDatanode(DatanodeDetails dn)
+ private void connectToDatanode(DatanodeDetails dn)
throws IOException {
+ if (isClosed.get()) {
+ throw new IOException("Client is closed.");
+ }
+
if (isConnected(dn)) {
return;
}
- // read port from the data node, on failure use default configured
- // port.
+ // read port from the data node, on failure use default configured port
int port = dn.getStandalonePort().getValue();
if (port == 0) {
port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
}
+ final int finalPort = port;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ",
- dn, pipeline.getNodes());
+ LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn,
pipeline.getNodes());
+
+ channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
+ if (channel.isTerminated() || channel.isShutdown()) {
+ asyncStubs.remove(dnId);
+ return null; // removes from channels map
+ }
+
+ return channel;
+ });
+
+ ManagedChannel channel;
+ try {
+ channel = channels.computeIfAbsent(dn.getID(), dnId -> {
+ try {
+ return createChannel(dn, finalPort).build();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (RuntimeException e) {
+ LOG.error("Failed to create channel to datanode {}", dn, e);
+ throw new IOException(e.getCause());
}
- ManagedChannel channel = createChannel(dn, port).build();
- XceiverClientProtocolServiceStub asyncStub =
- XceiverClientProtocolServiceGrpc.newStub(channel);
- asyncStubs.put(dn.getID(), asyncStub);
- channels.put(dn.getID(), channel);
+
+ asyncStubs.computeIfAbsent(dn.getID(), dnId ->
XceiverClientProtocolServiceGrpc.newStub(channel));
}
protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port)
@@ -235,24 +259,43 @@ private boolean isConnected(ManagedChannel channel) {
* Closes all the communication channels of the client one-by-one.
* When a channel is closed, no further requests can be sent via the channel,
* and the method waits to finish all ongoing communication.
- *
- * Note: the method wait 1 hour per channel tops and if that is not enough
- * to finish ongoing communication, then interrupts the connection anyway.
*/
@Override
- public synchronized void close() {
- closed = true;
+ public void close() {
+ if (!isClosed.compareAndSet(false, true)) {
+ // we should allow only one thread to perform the close operation to
make it idempotent
+ return;
+ }
+
for (ManagedChannel channel : channels.values()) {
- channel.shutdownNow();
+ channel.shutdown();
+ }
+
+ final long maxWaitNanos =
TimeUnit.SECONDS.toNanos(SHUTDOWN_WAIT_MAX_SECONDS);
+ long deadline = System.nanoTime() + maxWaitNanos;
+ List<ManagedChannel> nonTerminatedChannels = new
ArrayList<>(channels.values());
+
+ while (!nonTerminatedChannels.isEmpty() && System.nanoTime() < deadline) {
+ nonTerminatedChannels.removeIf(ManagedChannel::isTerminated);
try {
- channel.awaitTermination(60, TimeUnit.MINUTES);
+ Thread.sleep(SHUTDOWN_WAIT_INTERVAL_MILLIS);
} catch (InterruptedException e) {
- LOG.error("InterruptedException while waiting for channel termination",
- e);
- // Re-interrupt the thread while catching InterruptedException
+ LOG.error("Interrupted while waiting for channels to terminate", e);
Thread.currentThread().interrupt();
+ break;
}
}
+
+ List<DatanodeID> failedChannels = channels.entrySet().stream()
+ .filter(e -> !e.getValue().isTerminated())
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ if (!failedChannels.isEmpty()) {
+ LOG.warn("Channels {} did not terminate within timeout.",
failedChannels);
+ }
+
+ channels.clear();
+ asyncStubs.clear();
}
@Override
@@ -694,9 +737,9 @@ private void decreasePendingMetricsAndReleaseSemaphore() {
return new XceiverClientReply(replyFuture);
}
- private synchronized void checkOpen(DatanodeDetails dn)
+ private void checkOpen(DatanodeDetails dn)
throws IOException {
- if (closed) {
+ if (isClosed.get()) {
throw new IOException("This channel is not connected.");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]