Repository: hadoop Updated Branches: refs/heads/HDFS-7240 af6dfdf7e -> 15101eef7
HDFS-12246. Ozone: potential thread leaks. Contributed by Weiwei Yang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/15101eef Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/15101eef Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/15101eef Branch: refs/heads/HDFS-7240 Commit: 15101eef75063cd0225523032e84aba8f4b1c4bc Parents: af6dfdf Author: Xiaoyu Yao <[email protected]> Authored: Fri Aug 4 11:11:11 2017 -0700 Committer: Xiaoyu Yao <[email protected]> Committed: Fri Aug 4 11:11:11 2017 -0700 ---------------------------------------------------------------------- .../apache/hadoop/scm/XceiverClientManager.java | 4 +++- .../server/datanode/ObjectStoreHandler.java | 10 ++++------ .../apache/hadoop/ozone/OzoneClientImpl.java | 7 ++++--- .../statemachine/DatanodeStateMachine.java | 20 +++++++++++++++----- .../statemachine/SCMConnectionManager.java | 11 ++++++++++- .../common/transport/server/XceiverServer.java | 3 +++ .../ozone/scm/StorageContainerManager.java | 7 ++++--- .../web/storage/DistributedStorageHandler.java | 7 ++++--- 8 files changed, 47 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/15101eef/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java index 62e5af1..246246f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.scm; +import java.io.Closeable; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.Callable; @@ -53,7 +54,7 @@ import static org.apache.hadoop.scm.ScmConfigKeys * without reestablishing connection. But the connection will be closed if * not being used for a period of time. */ -public class XceiverClientManager { +public class XceiverClientManager implements Closeable { //TODO : change this to SCM configuration class private final Configuration conf; @@ -89,6 +90,7 @@ public class XceiverClientManager { // Mark the entry as evicted XceiverClientSpi info = removalNotification.getValue(); info.setEvicted(); + info.close(); } } }).build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/15101eef/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java index 40c7b1c..5b76179 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java @@ -30,6 +30,7 @@ import java.util.Map; import com.sun.jersey.api.container.ContainerFactory; import com.sun.jersey.api.core.ApplicationAdapter; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ksm.protocolPB .KeySpaceManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB; @@ -179,11 +180,8 @@ public final class ObjectStoreHandler implements Closeable { public void close() { LOG.info("Closing ObjectStoreHandler."); storageHandler.close(); - if (this.storageContainerLocationClient != null) { - this.storageContainerLocationClient.close(); - } - if (this.scmBlockLocationClient != null) { - this.scmBlockLocationClient.close(); - } + IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient); + IOUtils.cleanupWithLogger(LOG, scmBlockLocationClient); + IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/15101eef/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java index 3bd74a3..feb4586 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; @@ -562,8 +563,8 @@ public class OzoneClientImpl implements OzoneClient, Closeable { @Override public void close() throws IOException { - if(xceiverClientManager != null) { - xceiverClientManager.close(); - } + IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient); + IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient); + IOUtils.cleanupWithLogger(LOG, xceiverClientManager); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/15101eef/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 5cac7b0..a15ce75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -53,6 +53,8 @@ public class DatanodeStateMachine implements Closeable { private final CommandDispatcher commandDispatcher; private long commandsHandled; private AtomicLong nextHB; + private Thread stateMachineThread = null; + private Thread cmdProcessThread = null; /** * Constructs a a datanode state machine. @@ -136,6 +138,8 @@ public class DatanodeStateMachine implements Closeable { if (now < nextHB.get()) { Thread.sleep(nextHB.get() - now); } + } catch (InterruptedException e) { + // Ignore this exception. } catch (Exception e) { LOG.error("Unable to finish the execution.", e); } @@ -173,6 +177,12 @@ public class DatanodeStateMachine implements Closeable { */ @Override public void close() throws IOException { + if (stateMachineThread != null) { + stateMachineThread.interrupt(); + } + if (cmdProcessThread != null) { + cmdProcessThread.interrupt(); + } context.setState(DatanodeStates.getLastState()); executorService.shutdown(); try { @@ -189,8 +199,8 @@ public class DatanodeStateMachine implements Closeable { Thread.currentThread().interrupt(); } - for (EndpointStateMachine endPoint : connectionManager.getValues()) { - endPoint.close(); + if (connectionManager != null) { + connectionManager.close(); } if(container != null) { @@ -275,11 +285,11 @@ public class DatanodeStateMachine implements Closeable { LOG.error("Unable to start the DatanodeState Machine", ex); } }; - Thread thread = new ThreadFactoryBuilder() + stateMachineThread = new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("Datanode State Machine Thread - %d") .build().newThread(startStateMachineTask); - thread.start(); + stateMachineThread.start(); } /** @@ -344,7 +354,7 @@ public class DatanodeStateMachine implements Closeable { }; // We will have only one thread for command processing in a datanode. - Thread cmdProcessThread = new Thread(processCommandQueue); + cmdProcessThread = new Thread(processCommandQueue); cmdProcessThread.setDaemon(true); cmdProcessThread.setName("Command processor thread"); cmdProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> { http://git-wip-us.apache.org/repos/asf/hadoop/blob/15101eef/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java index a2384e8..9023526 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.container.common.statemachine; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; @@ -28,6 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collection; @@ -40,7 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * SCMConnectionManager - Acts as a class that manages the membership * information of the SCMs that we are working with. */ -public class SCMConnectionManager { +public class SCMConnectionManager implements Closeable{ private static final Logger LOG = LoggerFactory.getLogger(SCMConnectionManager.class); @@ -132,6 +134,7 @@ public class SCMConnectionManager { StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient = new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy); + EndpointStateMachine endPoint = new EndpointStateMachine(address, rpcClient, conf); scmMachines.put(address, endPoint); @@ -171,4 +174,10 @@ public class SCMConnectionManager { public Collection<EndpointStateMachine> getValues() { return scmMachines.values(); } + + @Override + public void close() throws IOException { + getValues().forEach(endpointStateMachine + -> IOUtils.cleanupWithLogger(LOG, endpointStateMachine)); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/15101eef/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java index cd2146b..3a6e672 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java @@ -97,6 +97,9 @@ public final class XceiverServer implements XceiverServerSpi { @Override public void stop() { + if (storageContainer != null) { + storageContainer.shutdown(); + } if (bossGroup != null) { bossGroup.shutdownGracefully(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/15101eef/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index 409b182..9d3df77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -21,10 +21,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.BlockingService; import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.metrics2.util.MBeans; @@ -610,8 +610,9 @@ public class StorageContainerManager } unregisterMXBean(); - IOUtils.closeQuietly(scmContainerManager); - IOUtils.closeQuietly(scmBlockManager); + IOUtils.cleanupWithLogger(LOG, scmContainerManager); + IOUtils.cleanupWithLogger(LOG, scmBlockManager); + IOUtils.cleanupWithLogger(LOG, scmNodeManager); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/15101eef/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index c5754e7..ccc71fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.web.storage; import com.google.common.base.Strings; import org.apache.hadoop.hdfs.server.datanode.fsdataset .LengthInputStream; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ksm.helpers.KsmBucketArgs; import org.apache.hadoop.ksm.helpers.KsmBucketInfo; import org.apache.hadoop.ksm.helpers.KsmKeyArgs; @@ -496,8 +497,8 @@ public final class DistributedStorageHandler implements StorageHandler { */ @Override public void close() { - if(xceiverClientManager != null) { - xceiverClientManager.close(); - } + IOUtils.cleanupWithLogger(LOG, xceiverClientManager); + IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient); + IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
