This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c656504b87 HDDS-9772. Avoid recreating typesafe config objects
unnecessarily (#5690)
c656504b87 is described below
commit c656504b871bd0004b29aff0bb51698c90730840
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Nov 30 09:20:41 2023 +0100
HDDS-9772. Avoid recreating typesafe config objects unnecessarily (#5690)
---
.../transport/server/ratis/XceiverServerRatis.java | 41 ++++++++++------------
...lockLocationProtocolClientSideTranslatorPB.java | 2 +-
.../SCMBlockLocationFailoverProxyProvider.java | 28 +++++++--------
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 5 +--
.../hdds/scm/block/SCMBlockDeletingService.java | 15 ++++----
.../hdds/scm/server/StorageContainerManager.java | 2 +-
6 files changed, 44 insertions(+), 49 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 2a57a32b39..0f8f7d4ecc 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -136,22 +136,24 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
private final List<ThreadPoolExecutor> chunkExecutors;
private final ContainerDispatcher dispatcher;
private final ContainerController containerController;
- private ClientId clientId = ClientId.randomId();
+ private final ClientId clientId = ClientId.randomId();
private final StateContext context;
- private long nodeFailureTimeoutMs;
+ private final long nodeFailureTimeoutMs;
private boolean isStarted = false;
- private DatanodeDetails datanodeDetails;
+ private final DatanodeDetails datanodeDetails;
private final ConfigurationSource conf;
// TODO: Remove the gids set when Ratis supports an api to query active
// pipelines
private final Set<RaftGroupId> raftGids = ConcurrentHashMap.newKeySet();
private final RaftPeerId raftPeerId;
// pipelines for which I am the leader
- private Map<RaftGroupId, Boolean> groupLeaderMap = new ConcurrentHashMap<>();
+ private final Map<RaftGroupId, Boolean> groupLeaderMap =
+ new ConcurrentHashMap<>();
// Timeout used while calling submitRequest directly.
- private long requestTimeout;
- private boolean shouldDeleteRatisLogDirectory;
- private boolean streamEnable;
+ private final long requestTimeout;
+ private final boolean shouldDeleteRatisLogDirectory;
+ private final boolean streamEnable;
+ private final DatanodeRatisServerConfig ratisServerConfig;
private XceiverServerRatis(DatanodeDetails dd,
ContainerDispatcher dispatcher, ContainerController containerController,
@@ -160,6 +162,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
this.conf = conf;
Objects.requireNonNull(dd, "id == null");
datanodeDetails = dd;
+ ratisServerConfig = conf.getObject(DatanodeRatisServerConfig.class);
assignPorts();
this.streamEnable = conf.getBoolean(
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED,
@@ -171,12 +174,9 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
this.raftPeerId = RatisHelper.toRaftPeerId(dd);
String threadNamePrefix = datanodeDetails.threadNamePrefix();
chunkExecutors = createChunkExecutors(conf, threadNamePrefix);
- nodeFailureTimeoutMs =
- conf.getObject(DatanodeRatisServerConfig.class)
- .getFollowerSlownessTimeout();
+ nodeFailureTimeoutMs = ratisServerConfig.getFollowerSlownessTimeout();
shouldDeleteRatisLogDirectory =
- conf.getObject(DatanodeRatisServerConfig.class)
- .shouldDeleteRatisLogDirectory();
+ ratisServerConfig.shouldDeleteRatisLogDirectory();
this.server =
RaftServer.newBuilder().setServerId(raftPeerId)
@@ -237,13 +237,10 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
RatisHelper.enableNettyStreaming(properties);
NettyConfigKeys.DataStream.setPort(properties, dataStreamPort);
int dataStreamAsyncRequestThreadPoolSize =
- conf.getObject(DatanodeRatisServerConfig.class)
- .getStreamRequestThreads();
+ ratisServerConfig.getStreamRequestThreads();
RaftServerConfigKeys.DataStream.setAsyncRequestThreadPoolSize(properties,
dataStreamAsyncRequestThreadPoolSize);
- int dataStreamClientPoolSize =
- conf.getObject(DatanodeRatisServerConfig.class)
- .getClientPoolSize();
+ int dataStreamClientPoolSize = ratisServerConfig.getClientPoolSize();
RaftServerConfigKeys.DataStream.setClientPoolSize(properties,
dataStreamClientPoolSize);
}
@@ -309,13 +306,13 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
// Disable the pre vote feature in Ratis
RaftServerConfigKeys.LeaderElection.setPreVote(properties,
- conf.getObject(DatanodeRatisServerConfig.class).isPreVoteEnabled());
+ ratisServerConfig.isPreVoteEnabled());
// Set the ratis storage directory
Collection<String> storageDirPaths =
HddsServerUtil.getOzoneDatanodeRatisDirectory(conf);
List<File> storageDirs = new ArrayList<>(storageDirPaths.size());
- storageDirPaths.stream().forEach(d -> storageDirs.add(new File(d)));
+ storageDirPaths.forEach(d -> storageDirs.add(new File(d)));
RaftServerConfigKeys.setStorageDir(properties, storageDirs);
@@ -621,16 +618,16 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
@Override
public void submitRequest(ContainerCommandRequestProto request,
HddsProtos.PipelineID pipelineID) throws IOException {
- RaftClientReply reply = null;
Span span = TracingUtil
.importAndCreateSpan(
"XceiverServerRatis." + request.getCmdType().name(),
request.getTraceID());
- try (Scope scope = GlobalTracer.get().activateSpan(span)) {
+ try (Scope ignored = GlobalTracer.get().activateSpan(span)) {
RaftClientRequest raftClientRequest =
createRaftClientRequest(request, pipelineID,
RaftClientRequest.writeRequestType());
+ RaftClientReply reply;
try {
reply = server.submitClientRequestAsync(raftClientRequest)
.get(requestTimeout, TimeUnit.MILLISECONDS);
@@ -896,7 +893,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
GroupInfoReply reply = getServer()
.getGroupInfo(createGroupInfoRequest(pipelineID.getProtobuf()));
minIndex = RatisHelper.getMinReplicatedIndex(reply.getCommitInfos());
- return minIndex == null ? -1 : minIndex.longValue();
+ return minIndex == null ? -1 : minIndex;
}
public void notifyGroupRemove(RaftGroupId gid) {
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index eb19e46b7a..3c20295bd6 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -94,7 +94,7 @@ public final class
ScmBlockLocationProtocolClientSideTranslatorPB
this.failoverProxyProvider = proxyProvider;
this.rpcProxy = (ScmBlockLocationProtocolPB) RetryProxy.create(
ScmBlockLocationProtocolPB.class, failoverProxyProvider,
- failoverProxyProvider.getSCMBlockLocationRetryPolicy(null));
+ failoverProxyProvider.getSCMBlockLocationRetryPolicy());
}
/**
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
index 5cff888830..0c58e8cc51 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
@@ -56,9 +56,10 @@ public class SCMBlockLocationFailoverProxyProvider implements
FailoverProxyProvider<ScmBlockLocationProtocolPB>, Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(SCMBlockLocationFailoverProxyProvider.class);
+ private final SCMClientConfig scmClientConfig;
- private Map<String, ProxyInfo<ScmBlockLocationProtocolPB>> scmProxies;
- private Map<String, SCMProxyInfo> scmProxyInfoMap;
+ private final Map<String, ProxyInfo<ScmBlockLocationProtocolPB>> scmProxies;
+ private final Map<String, SCMProxyInfo> scmProxyInfoMap;
private List<String> scmNodeIds;
// As SCM Client is shared across threads, performFailOver()
@@ -107,9 +108,9 @@ public class SCMBlockLocationFailoverProxyProvider
implements
this.currentProxyIndex = 0;
currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
- SCMClientConfig config = conf.getObject(SCMClientConfig.class);
- this.maxRetryCount = config.getRetryCount();
- this.retryInterval = config.getRetryInterval();
+ scmClientConfig = conf.getObject(SCMClientConfig.class);
+ this.maxRetryCount = scmClientConfig.getRetryCount();
+ this.retryInterval = scmClientConfig.getRetryInterval();
LOG.info("Created block location fail-over proxy with {} nodes: {}",
scmNodeIds.size(), scmProxyInfoMap.values());
@@ -146,15 +147,15 @@ public class SCMBlockLocationFailoverProxyProvider
implements
nextProxyIndex();
}
- @VisibleForTesting
- public synchronized String getCurrentProxySCMNodeId() {
+ private synchronized String getCurrentProxySCMNodeId() {
return currentProxySCMNodeId;
}
@Override
public synchronized ProxyInfo<ScmBlockLocationProtocolPB> getProxy() {
String currentProxyNodeId = getCurrentProxySCMNodeId();
- ProxyInfo currentProxyInfo = scmProxies.get(currentProxyNodeId);
+ ProxyInfo<ScmBlockLocationProtocolPB> currentProxyInfo =
+ scmProxies.get(currentProxyNodeId);
if (currentProxyInfo == null) {
currentProxyInfo = createSCMProxy(currentProxyNodeId);
}
@@ -238,8 +239,8 @@ public class SCMBlockLocationFailoverProxyProvider
implements
/**
* Creates proxy object.
*/
- private ProxyInfo createSCMProxy(String nodeId) {
- ProxyInfo proxyInfo;
+ private ProxyInfo<ScmBlockLocationProtocolPB> createSCMProxy(String nodeId) {
+ ProxyInfo<ScmBlockLocationProtocolPB> proxyInfo;
SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId);
InetSocketAddress address = scmProxyInfo.getAddress();
try {
@@ -269,12 +270,12 @@ public class SCMBlockLocationFailoverProxyProvider
implements
return RPC.getProtocolProxy(ScmBlockLocationProtocolPB.class, scmVersion,
scmAddress, ugi, hadoopConf,
NetUtils.getDefaultSocketFactory(hadoopConf),
- (int)conf.getObject(SCMClientConfig.class).getRpcTimeOut(),
+ (int) scmClientConfig.getRpcTimeOut(),
connectionRetryPolicy).getProxy();
}
- public RetryPolicy getSCMBlockLocationRetryPolicy(String newLeader) {
- RetryPolicy retryPolicy = new RetryPolicy() {
+ public RetryPolicy getSCMBlockLocationRetryPolicy() {
+ return new RetryPolicy() {
@Override
public RetryAction shouldRetry(Exception e, int retry,
int failover, boolean b) {
@@ -287,7 +288,6 @@ public class SCMBlockLocationFailoverProxyProvider
implements
getRetryInterval());
}
};
- return retryPolicy;
}
public synchronized int getCurrentProxyIndex() {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 48830cafbd..1260ea6a00 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
@@ -79,7 +80,7 @@ public class BlockManagerImpl implements BlockManager,
BlockmanagerMXBean {
* @throws IOException
*/
public BlockManagerImpl(final ConfigurationSource conf,
- final StorageContainerManager scm)
+ ScmConfig scmConfig, final StorageContainerManager scm)
throws IOException {
Objects.requireNonNull(scm, "SCM cannot be null");
this.scm = scm;
@@ -108,7 +109,7 @@ public class BlockManagerImpl implements BlockManager,
BlockmanagerMXBean {
blockDeletingService =
new SCMBlockDeletingService(deletedBlockLog,
scm.getScmNodeManager(), scm.getEventQueue(), scm.getScmContext(),
- scm.getSCMServiceManager(), conf,
+ scm.getSCMServiceManager(), conf, scmConfig,
metrics, scm.getSystemClock(), scm.getReconfigurationHandler());
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 16bfed2bba..5fd889b758 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -81,8 +81,7 @@ public class SCMBlockDeletingService extends BackgroundService
private final SCMContext scmContext;
private final ScmConfig scmConf;
- private int blockDeleteLimitSize;
- private ScmBlockDeletingServiceMetrics metrics;
+ private final ScmBlockDeletingServiceMetrics metrics;
/**
* SCMService related variables.
@@ -99,11 +98,10 @@ public class SCMBlockDeletingService extends
BackgroundService
NodeManager nodeManager, EventPublisher eventPublisher,
SCMContext scmContext, SCMServiceManager serviceManager,
ConfigurationSource conf,
- ScmBlockDeletingServiceMetrics metrics,
+ ScmConfig scmConfig, ScmBlockDeletingServiceMetrics metrics,
Clock clock, ReconfigurationHandler reconfigurationHandler) {
super("SCMBlockDeletingService",
- conf.getObject(ScmConfig.class)
- .getBlockDeletionInterval().toMillis(),
+ scmConfig.getBlockDeletionInterval().toMillis(),
TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE,
conf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
@@ -119,11 +117,10 @@ public class SCMBlockDeletingService extends
BackgroundService
this.eventPublisher = eventPublisher;
this.scmContext = scmContext;
this.metrics = metrics;
- scmConf = conf.getObject(ScmConfig.class);
- reconfigurationHandler.register(scmConf);
- blockDeleteLimitSize = scmConf.getBlockDeletionLimit();
- Preconditions.checkArgument(blockDeleteLimitSize > 0,
+ scmConf = scmConfig;
+ Preconditions.checkArgument(scmConf.getBlockDeletionLimit() > 0,
"Block deletion limit should be positive.");
+ reconfigurationHandler.register(scmConf);
// register SCMBlockDeletingService to SCMServiceManager
serviceManager.register(this);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index edba7bb6d3..59a533a665 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -806,7 +806,7 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
if (configurator.getScmBlockManager() != null) {
scmBlockManager = configurator.getScmBlockManager();
} else {
- scmBlockManager = new BlockManagerImpl(conf, this);
+ scmBlockManager = new BlockManagerImpl(conf, scmConfig, this);
}
if (configurator.getReplicationManager() != null) {
replicationManager = configurator.getReplicationManager();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]