ChenSammi commented on code in PR #7456:
URL: https://github.com/apache/ozone/pull/7456#discussion_r1856195208
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java:
##########
@@ -150,24 +197,50 @@ public void releaseClient(XceiverClientSpi client,
boolean invalidateClient,
}
}
- protected XceiverClientSpi getClient(Pipeline pipeline, boolean
topologyAware)
+ protected XceiverClientSpi getClient(Pipeline pipeline, boolean
topologyAware, boolean allowShortCircuit)
throws IOException {
try {
// create different client different pipeline node based on
// network topology
- String key = getPipelineCacheKey(pipeline, topologyAware);
- return clientCache.get(key, () -> newClient(pipeline));
+ String key = getPipelineCacheKey(pipeline, topologyAware,
allowShortCircuit);
+ if (key.endsWith(DomainSocketFactory.FEATURE_FLAG)) {
+ final Pipeline newPipeline =
Pipeline.newBuilder(pipeline).setReplicationConfig(
+ ReplicationConfig.fromTypeAndFactor(ReplicationType.SHORT_CIRCUIT,
+
ReplicationFactor.valueOf(pipeline.getReplicationConfig().getReplication()))).build();
+ return clientCache.get(key, () -> newClient(newPipeline,
localDNCache.get(key)));
+ } else {
+ return clientCache.get(key, () -> newClient(pipeline));
+ }
} catch (Exception e) {
throw new IOException(
"Exception getting XceiverClient: " + e, e);
}
}
- private String getPipelineCacheKey(Pipeline pipeline,
- boolean topologyAware) {
- String key = pipeline.getId().getId().toString() + pipeline.getType();
+ private String getPipelineCacheKey(Pipeline pipeline, boolean topologyAware,
boolean allowShortCircuit) {
+ String key = pipeline.getId().getId().toString() + "-" +
pipeline.getType();
boolean isEC = pipeline.getType() == HddsProtos.ReplicationType.EC;
- if (topologyAware || isEC) {
+ DatanodeDetails localDN = null;
+
+ if ((!isEC) && allowShortCircuit && isShortCircuitEnabled()) {
+ int port = 0;
+ InetSocketAddress localAddr = null;
+ for (DatanodeDetails dn : pipeline.getNodes()) {
+ // read port from the data node, on failure use default configured
port.
+ port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
+ InetSocketAddress addr = NetUtils.createSocketAddr(dn.getIpAddress(),
port);
+ if (OzoneNetUtils.isAddressLocal(addr) &&
+ dn.getCurrentVersion() >= SHORT_CIRCUIT_READS.toProtoValue()) {
+ localAddr = addr;
+ localDN = dn;
+ break;
+ }
+ }
+ if (localAddr != null) {
+ // Find a local DN and short circuit read is enabled
+ key += "@" + localAddr.getHostName() + ":" + port + "/" +
DomainSocketFactory.FEATURE_FLAG;
+ }
+ } else if (topologyAware || isEC) {
Review Comment:
Good point.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]