HDFS-8086. Move LeaseRenewer to the hdfs.client.impl package. Contributed by Takanobu
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3d019c3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3d019c3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3d019c3 Branch: refs/heads/HDFS-7240 Commit: d3d019c337ecc10e9c6bbefc3a97c6cd1f5283c3 Parents: 64d30a6 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Fri May 1 15:11:09 2015 -0700 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Fri May 1 15:12:18 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../java/org/apache/hadoop/hdfs/DFSClient.java | 13 +- .../org/apache/hadoop/hdfs/LeaseRenewer.java | 512 ------------------ .../hadoop/hdfs/client/impl/LeaseRenewer.java | 514 +++++++++++++++++++ .../hadoop/hdfs/TestDistributedFileSystem.java | 59 ++- .../java/org/apache/hadoop/hdfs/TestLease.java | 1 + .../apache/hadoop/hdfs/TestLeaseRenewer.java | 207 -------- .../hdfs/client/impl/TestLeaseRenewer.java | 209 ++++++++ 8 files changed, 769 insertions(+), 749 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b5c5e6b..179fe7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -494,6 +494,9 @@ Release 2.8.0 - UNRELEASED HDFS-8292. Move conditional in fmt_time from dfs-dust.js to status.html. (Charles Lamb via wang) + HDFS-8086. Move LeaseRenewer to the hdfs.client.impl package. (Takanobu + Asanuma via szetszwo) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index d47992b..aaba543 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -101,6 +101,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.AclException; @@ -481,7 +482,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * enforced to consistently update its local dfsclients array and * client's filesBeingWritten map. */ - void putFileBeingWritten(final long inodeId, final DFSOutputStream out) { + public void putFileBeingWritten(final long inodeId, final DFSOutputStream out) { synchronized(filesBeingWritten) { filesBeingWritten.put(inodeId, out); // update the last lease renewal time only when there was no @@ -494,7 +495,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } /** Remove a file. Only called from LeaseRenewer. */ - void removeFileBeingWritten(final long inodeId) { + public void removeFileBeingWritten(final long inodeId) { synchronized(filesBeingWritten) { filesBeingWritten.remove(inodeId); if (filesBeingWritten.isEmpty()) { @@ -504,14 +505,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } /** Is file-being-written map empty? */ - boolean isFilesBeingWrittenEmpty() { + public boolean isFilesBeingWrittenEmpty() { synchronized(filesBeingWritten) { return filesBeingWritten.isEmpty(); } } /** @return true if the client is running */ - boolean isClientRunning() { + public boolean isClientRunning() { return clientRunning; } @@ -533,7 +534,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @return true if lease was renewed. May return false if this * client has been closed or has no files open. **/ - boolean renewLease() throws IOException { + public boolean renewLease() throws IOException { if (clientRunning && !isFilesBeingWrittenEmpty()) { try { namenode.renewLease(clientName); @@ -565,7 +566,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } /** Abort and release resources held. Ignore all errors. */ - void abort() { + public void abort() { clientRunning = false; closeAllFilesBeingWritten(true); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java deleted file mode 100644 index 511bddb..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java +++ /dev/null @@ -1,512 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.Time; -import com.google.common.annotations.VisibleForTesting; - -/** - * <p> - * Used by {@link DFSClient} for renewing file-being-written leases - * on the namenode. - * When a file is opened for write (create or append), - * namenode stores a file lease for recording the identity of the writer. - * The writer (i.e. the DFSClient) is required to renew the lease periodically. - * When the lease is not renewed before it expires, - * the namenode considers the writer as failed and then it may either let - * another writer to obtain the lease or close the file. - * </p> - * <p> - * This class also provides the following functionality: - * <ul> - * <li> - * It maintains a map from (namenode, user) pairs to lease renewers. - * The same {@link LeaseRenewer} instance is used for renewing lease - * for all the {@link DFSClient} to the same namenode and the same user. - * </li> - * <li> - * Each renewer maintains a list of {@link DFSClient}. - * Periodically the leases for all the clients are renewed. - * A client is removed from the list when the client is closed. - * </li> - * <li> - * A thread per namenode per user is used by the {@link LeaseRenewer} - * to renew the leases. - * </li> - * </ul> - * </p> - */ [email protected] -class LeaseRenewer { - static final Log LOG = LogFactory.getLog(LeaseRenewer.class); - - static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L; - static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L; - - /** Get a {@link LeaseRenewer} instance */ - static LeaseRenewer getInstance(final String authority, - final UserGroupInformation ugi, final DFSClient dfsc) throws IOException { - final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi); - r.addClient(dfsc); - return r; - } - - /** - * A factory for sharing {@link LeaseRenewer} objects - * among {@link DFSClient} instances - * so that there is only one renewer per authority per user. - */ - private static class Factory { - private static final Factory INSTANCE = new Factory(); - - private static class Key { - /** Namenode info */ - final String authority; - /** User info */ - final UserGroupInformation ugi; - - private Key(final String authority, final UserGroupInformation ugi) { - if (authority == null) { - throw new HadoopIllegalArgumentException("authority == null"); - } else if (ugi == null) { - throw new HadoopIllegalArgumentException("ugi == null"); - } - - this.authority = authority; - this.ugi = ugi; - } - - @Override - public int hashCode() { - return authority.hashCode() ^ ugi.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - if (obj != null && obj instanceof Key) { - final Key that = (Key)obj; - return this.authority.equals(that.authority) - && this.ugi.equals(that.ugi); - } - return false; - } - - @Override - public String toString() { - return ugi.getShortUserName() + "@" + authority; - } - } - - /** A map for per user per namenode renewers. */ - private final Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>(); - - /** Get a renewer. */ - private synchronized LeaseRenewer get(final String authority, - final UserGroupInformation ugi) { - final Key k = new Key(authority, ugi); - LeaseRenewer r = renewers.get(k); - if (r == null) { - r = new LeaseRenewer(k); - renewers.put(k, r); - } - return r; - } - - /** Remove the given renewer. */ - private synchronized void remove(final LeaseRenewer r) { - final LeaseRenewer stored = renewers.get(r.factorykey); - //Since a renewer may expire, the stored renewer can be different. - if (r == stored) { - if (!r.clientsRunning()) { - renewers.remove(r.factorykey); - } - } - } - } - - /** The time in milliseconds that the map became empty. */ - private long emptyTime = Long.MAX_VALUE; - /** A fixed lease renewal time period in milliseconds */ - private long renewal = HdfsConstants.LEASE_SOFTLIMIT_PERIOD/2; - - /** A daemon for renewing lease */ - private Daemon daemon = null; - /** Only the daemon with currentId should run. */ - private int currentId = 0; - - /** - * A period in milliseconds that the lease renewer thread should run - * after the map became empty. - * In other words, - * if the map is empty for a time period longer than the grace period, - * the renewer should terminate. - */ - private long gracePeriod; - /** - * The time period in milliseconds - * that the renewer sleeps for each iteration. - */ - private long sleepPeriod; - - private final Factory.Key factorykey; - - /** A list of clients corresponding to this renewer. */ - private final List<DFSClient> dfsclients = new ArrayList<DFSClient>(); - - /** - * A stringified stack trace of the call stack when the Lease Renewer - * was instantiated. This is only generated if trace-level logging is - * enabled on this class. - */ - private final String instantiationTrace; - - private LeaseRenewer(Factory.Key factorykey) { - this.factorykey = factorykey; - unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT); - - if (LOG.isTraceEnabled()) { - instantiationTrace = StringUtils.stringifyException( - new Throwable("TRACE")); - } else { - instantiationTrace = null; - } - } - - /** @return the renewal time in milliseconds. */ - private synchronized long getRenewalTime() { - return renewal; - } - - /** Add a client. */ - private synchronized void addClient(final DFSClient dfsc) { - for(DFSClient c : dfsclients) { - if (c == dfsc) { - //client already exists, nothing to do. - return; - } - } - //client not found, add it - dfsclients.add(dfsc); - - //update renewal time - final int hdfsTimeout = dfsc.getConf().getHdfsTimeout(); - if (hdfsTimeout > 0) { - final long half = hdfsTimeout/2; - if (half < renewal) { - this.renewal = half; - } - } - } - - private synchronized boolean clientsRunning() { - for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) { - if (!i.next().isClientRunning()) { - i.remove(); - } - } - return !dfsclients.isEmpty(); - } - - private synchronized long getSleepPeriod() { - return sleepPeriod; - } - - /** Set the grace period and adjust the sleep period accordingly. */ - synchronized void setGraceSleepPeriod(final long gracePeriod) { - unsyncSetGraceSleepPeriod(gracePeriod); - } - - private void unsyncSetGraceSleepPeriod(final long gracePeriod) { - if (gracePeriod < 100L) { - throw new HadoopIllegalArgumentException(gracePeriod - + " = gracePeriod < 100ms is too small."); - } - this.gracePeriod = gracePeriod; - final long half = gracePeriod/2; - this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT? - half: LEASE_RENEWER_SLEEP_DEFAULT; - } - - /** Is the daemon running? */ - synchronized boolean isRunning() { - return daemon != null && daemon.isAlive(); - } - - /** Does this renewer have nothing to renew? */ - public boolean isEmpty() { - return dfsclients.isEmpty(); - } - - /** Used only by tests */ - synchronized String getDaemonName() { - return daemon.getName(); - } - - /** Is the empty period longer than the grace period? */ - private synchronized boolean isRenewerExpired() { - return emptyTime != Long.MAX_VALUE - && Time.monotonicNow() - emptyTime > gracePeriod; - } - - synchronized void put(final long inodeId, final DFSOutputStream out, - final DFSClient dfsc) { - if (dfsc.isClientRunning()) { - if (!isRunning() || isRenewerExpired()) { - //start a new deamon with a new id. - final int id = ++currentId; - daemon = new Daemon(new Runnable() { - @Override - public void run() { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Lease renewer daemon for " + clientsString() - + " with renew id " + id + " started"); - } - LeaseRenewer.this.run(id); - } catch(InterruptedException e) { - if (LOG.isDebugEnabled()) { - LOG.debug(LeaseRenewer.this.getClass().getSimpleName() - + " is interrupted.", e); - } - } finally { - synchronized(LeaseRenewer.this) { - Factory.INSTANCE.remove(LeaseRenewer.this); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Lease renewer daemon for " + clientsString() - + " with renew id " + id + " exited"); - } - } - } - - @Override - public String toString() { - return String.valueOf(LeaseRenewer.this); - } - }); - daemon.start(); - } - dfsc.putFileBeingWritten(inodeId, out); - emptyTime = Long.MAX_VALUE; - } - } - - @VisibleForTesting - synchronized void setEmptyTime(long time) { - emptyTime = time; - } - - /** Close a file. */ - void closeFile(final long inodeId, final DFSClient dfsc) { - dfsc.removeFileBeingWritten(inodeId); - - synchronized(this) { - if (dfsc.isFilesBeingWrittenEmpty()) { - dfsclients.remove(dfsc); - } - //update emptyTime if necessary - if (emptyTime == Long.MAX_VALUE) { - for(DFSClient c : dfsclients) { - if (!c.isFilesBeingWrittenEmpty()) { - //found a non-empty file-being-written map - return; - } - } - //discover the first time that all file-being-written maps are empty. - emptyTime = Time.monotonicNow(); - } - } - } - - /** Close the given client. */ - synchronized void closeClient(final DFSClient dfsc) { - dfsclients.remove(dfsc); - if (dfsclients.isEmpty()) { - if (!isRunning() || isRenewerExpired()) { - Factory.INSTANCE.remove(LeaseRenewer.this); - return; - } - if (emptyTime == Long.MAX_VALUE) { - //discover the first time that the client list is empty. - emptyTime = Time.monotonicNow(); - } - } - - //update renewal time - if (renewal == dfsc.getConf().getHdfsTimeout()/2) { - long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD; - for(DFSClient c : dfsclients) { - final int timeout = c.getConf().getHdfsTimeout(); - if (timeout > 0 && timeout < min) { - min = timeout; - } - } - renewal = min/2; - } - } - - void interruptAndJoin() throws InterruptedException { - Daemon daemonCopy = null; - synchronized (this) { - if (isRunning()) { - daemon.interrupt(); - daemonCopy = daemon; - } - } - - if (daemonCopy != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Wait for lease checker to terminate"); - } - daemonCopy.join(); - } - } - - private void renew() throws IOException { - final List<DFSClient> copies; - synchronized(this) { - copies = new ArrayList<DFSClient>(dfsclients); - } - //sort the client names for finding out repeated names. - Collections.sort(copies, new Comparator<DFSClient>() { - @Override - public int compare(final DFSClient left, final DFSClient right) { - return left.getClientName().compareTo(right.getClientName()); - } - }); - String previousName = ""; - for(int i = 0; i < copies.size(); i++) { - final DFSClient c = copies.get(i); - //skip if current client name is the same as the previous name. - if (!c.getClientName().equals(previousName)) { - if (!c.renewLease()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Did not renew lease for client " + - c); - } - continue; - } - previousName = c.getClientName(); - if (LOG.isDebugEnabled()) { - LOG.debug("Lease renewed for client " + previousName); - } - } - } - } - - /** - * Periodically check in with the namenode and renew all the leases - * when the lease period is half over. - */ - private void run(final int id) throws InterruptedException { - for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted(); - Thread.sleep(getSleepPeriod())) { - final long elapsed = Time.monotonicNow() - lastRenewed; - if (elapsed >= getRenewalTime()) { - try { - renew(); - if (LOG.isDebugEnabled()) { - LOG.debug("Lease renewer daemon for " + clientsString() - + " with renew id " + id + " executed"); - } - lastRenewed = Time.monotonicNow(); - } catch (SocketTimeoutException ie) { - LOG.warn("Failed to renew lease for " + clientsString() + " for " - + (elapsed/1000) + " seconds. Aborting ...", ie); - synchronized (this) { - while (!dfsclients.isEmpty()) { - dfsclients.get(0).abort(); - } - } - break; - } catch (IOException ie) { - LOG.warn("Failed to renew lease for " + clientsString() + " for " - + (elapsed/1000) + " seconds. Will retry shortly ...", ie); - } - } - - synchronized(this) { - if (id != currentId || isRenewerExpired()) { - if (LOG.isDebugEnabled()) { - if (id != currentId) { - LOG.debug("Lease renewer daemon for " + clientsString() - + " with renew id " + id + " is not current"); - } else { - LOG.debug("Lease renewer daemon for " + clientsString() - + " with renew id " + id + " expired"); - } - } - //no longer the current daemon or expired - return; - } - - // if no clients are in running state or there is no more clients - // registered with this renewer, stop the daemon after the grace - // period. - if (!clientsRunning() && emptyTime == Long.MAX_VALUE) { - emptyTime = Time.monotonicNow(); - } - } - } - } - - @Override - public String toString() { - String s = getClass().getSimpleName() + ":" + factorykey; - if (LOG.isTraceEnabled()) { - return s + ", clients=" + clientsString() - + ", created at " + instantiationTrace; - } - return s; - } - - /** Get the names of all clients */ - private synchronized String clientsString() { - if (dfsclients.isEmpty()) { - return "[]"; - } else { - final StringBuilder b = new StringBuilder("[").append( - dfsclients.get(0).getClientName()); - for(int i = 1; i < dfsclients.size(); i++) { - b.append(", ").append(dfsclients.get(i).getClientName()); - } - return b.append("]").toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java new file mode 100644 index 0000000..4cdf168 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java @@ -0,0 +1,514 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; +import com.google.common.annotations.VisibleForTesting; + +/** + * <p> + * Used by {@link org.apache.hadoop.hdfs.DFSClient} for renewing file-being-written leases + * on the namenode. + * When a file is opened for write (create or append), + * namenode stores a file lease for recording the identity of the writer. + * The writer (i.e. the DFSClient) is required to renew the lease periodically. + * When the lease is not renewed before it expires, + * the namenode considers the writer as failed and then it may either let + * another writer to obtain the lease or close the file. + * </p> + * <p> + * This class also provides the following functionality: + * <ul> + * <li> + * It maintains a map from (namenode, user) pairs to lease renewers. + * The same {@link LeaseRenewer} instance is used for renewing lease + * for all the {@link org.apache.hadoop.hdfs.DFSClient} to the same namenode and the same user. + * </li> + * <li> + * Each renewer maintains a list of {@link org.apache.hadoop.hdfs.DFSClient}. + * Periodically the leases for all the clients are renewed. + * A client is removed from the list when the client is closed. + * </li> + * <li> + * A thread per namenode per user is used by the {@link LeaseRenewer} + * to renew the leases. + * </li> + * </ul> + * </p> + */ [email protected] +public class LeaseRenewer { + static final Log LOG = LogFactory.getLog(LeaseRenewer.class); + + static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L; + static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L; + + /** Get a {@link LeaseRenewer} instance */ + public static LeaseRenewer getInstance(final String authority, + final UserGroupInformation ugi, final DFSClient dfsc) throws IOException { + final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi); + r.addClient(dfsc); + return r; + } + + /** + * A factory for sharing {@link LeaseRenewer} objects + * among {@link DFSClient} instances + * so that there is only one renewer per authority per user. + */ + private static class Factory { + private static final Factory INSTANCE = new Factory(); + + private static class Key { + /** Namenode info */ + final String authority; + /** User info */ + final UserGroupInformation ugi; + + private Key(final String authority, final UserGroupInformation ugi) { + if (authority == null) { + throw new HadoopIllegalArgumentException("authority == null"); + } else if (ugi == null) { + throw new HadoopIllegalArgumentException("ugi == null"); + } + + this.authority = authority; + this.ugi = ugi; + } + + @Override + public int hashCode() { + return authority.hashCode() ^ ugi.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj != null && obj instanceof Key) { + final Key that = (Key)obj; + return this.authority.equals(that.authority) + && this.ugi.equals(that.ugi); + } + return false; + } + + @Override + public String toString() { + return ugi.getShortUserName() + "@" + authority; + } + } + + /** A map for per user per namenode renewers. */ + private final Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>(); + + /** Get a renewer. */ + private synchronized LeaseRenewer get(final String authority, + final UserGroupInformation ugi) { + final Key k = new Key(authority, ugi); + LeaseRenewer r = renewers.get(k); + if (r == null) { + r = new LeaseRenewer(k); + renewers.put(k, r); + } + return r; + } + + /** Remove the given renewer. */ + private synchronized void remove(final LeaseRenewer r) { + final LeaseRenewer stored = renewers.get(r.factorykey); + //Since a renewer may expire, the stored renewer can be different. + if (r == stored) { + if (!r.clientsRunning()) { + renewers.remove(r.factorykey); + } + } + } + } + + /** The time in milliseconds that the map became empty. */ + private long emptyTime = Long.MAX_VALUE; + /** A fixed lease renewal time period in milliseconds */ + private long renewal = HdfsConstants.LEASE_SOFTLIMIT_PERIOD/2; + + /** A daemon for renewing lease */ + private Daemon daemon = null; + /** Only the daemon with currentId should run. */ + private int currentId = 0; + + /** + * A period in milliseconds that the lease renewer thread should run + * after the map became empty. + * In other words, + * if the map is empty for a time period longer than the grace period, + * the renewer should terminate. + */ + private long gracePeriod; + /** + * The time period in milliseconds + * that the renewer sleeps for each iteration. + */ + private long sleepPeriod; + + private final Factory.Key factorykey; + + /** A list of clients corresponding to this renewer. */ + private final List<DFSClient> dfsclients = new ArrayList<DFSClient>(); + + /** + * A stringified stack trace of the call stack when the Lease Renewer + * was instantiated. This is only generated if trace-level logging is + * enabled on this class. + */ + private final String instantiationTrace; + + private LeaseRenewer(Factory.Key factorykey) { + this.factorykey = factorykey; + unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT); + + if (LOG.isTraceEnabled()) { + instantiationTrace = StringUtils.stringifyException( + new Throwable("TRACE")); + } else { + instantiationTrace = null; + } + } + + /** @return the renewal time in milliseconds. */ + private synchronized long getRenewalTime() { + return renewal; + } + + /** Add a client. */ + private synchronized void addClient(final DFSClient dfsc) { + for(DFSClient c : dfsclients) { + if (c == dfsc) { + //client already exists, nothing to do. + return; + } + } + //client not found, add it + dfsclients.add(dfsc); + + //update renewal time + final int hdfsTimeout = dfsc.getConf().getHdfsTimeout(); + if (hdfsTimeout > 0) { + final long half = hdfsTimeout/2; + if (half < renewal) { + this.renewal = half; + } + } + } + + private synchronized boolean clientsRunning() { + for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) { + if (!i.next().isClientRunning()) { + i.remove(); + } + } + return !dfsclients.isEmpty(); + } + + private synchronized long getSleepPeriod() { + return sleepPeriod; + } + + /** Set the grace period and adjust the sleep period accordingly. */ + synchronized void setGraceSleepPeriod(final long gracePeriod) { + unsyncSetGraceSleepPeriod(gracePeriod); + } + + private void unsyncSetGraceSleepPeriod(final long gracePeriod) { + if (gracePeriod < 100L) { + throw new HadoopIllegalArgumentException(gracePeriod + + " = gracePeriod < 100ms is too small."); + } + this.gracePeriod = gracePeriod; + final long half = gracePeriod/2; + this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT? + half: LEASE_RENEWER_SLEEP_DEFAULT; + } + + /** Is the daemon running? */ + synchronized boolean isRunning() { + return daemon != null && daemon.isAlive(); + } + + /** Does this renewer have nothing to renew? */ + public boolean isEmpty() { + return dfsclients.isEmpty(); + } + + /** Used only by tests */ + synchronized String getDaemonName() { + return daemon.getName(); + } + + /** Is the empty period longer than the grace period? */ + private synchronized boolean isRenewerExpired() { + return emptyTime != Long.MAX_VALUE + && Time.monotonicNow() - emptyTime > gracePeriod; + } + + public synchronized void put(final long inodeId, final DFSOutputStream out, + final DFSClient dfsc) { + if (dfsc.isClientRunning()) { + if (!isRunning() || isRenewerExpired()) { + //start a new deamon with a new id. + final int id = ++currentId; + daemon = new Daemon(new Runnable() { + @Override + public void run() { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Lease renewer daemon for " + clientsString() + + " with renew id " + id + " started"); + } + LeaseRenewer.this.run(id); + } catch(InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug(LeaseRenewer.this.getClass().getSimpleName() + + " is interrupted.", e); + } + } finally { + synchronized(LeaseRenewer.this) { + Factory.INSTANCE.remove(LeaseRenewer.this); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Lease renewer daemon for " + clientsString() + + " with renew id " + id + " exited"); + } + } + } + + @Override + public String toString() { + return String.valueOf(LeaseRenewer.this); + } + }); + daemon.start(); + } + dfsc.putFileBeingWritten(inodeId, out); + emptyTime = Long.MAX_VALUE; + } + } + + @VisibleForTesting + synchronized void setEmptyTime(long time) { + emptyTime = time; + } + + /** Close a file. */ + public void closeFile(final long inodeId, final DFSClient dfsc) { + dfsc.removeFileBeingWritten(inodeId); + + synchronized(this) { + if (dfsc.isFilesBeingWrittenEmpty()) { + dfsclients.remove(dfsc); + } + //update emptyTime if necessary + if (emptyTime == Long.MAX_VALUE) { + for(DFSClient c : dfsclients) { + if (!c.isFilesBeingWrittenEmpty()) { + //found a non-empty file-being-written map + return; + } + } + //discover the first time that all file-being-written maps are empty. + emptyTime = Time.monotonicNow(); + } + } + } + + /** Close the given client. */ + public synchronized void closeClient(final DFSClient dfsc) { + dfsclients.remove(dfsc); + if (dfsclients.isEmpty()) { + if (!isRunning() || isRenewerExpired()) { + Factory.INSTANCE.remove(LeaseRenewer.this); + return; + } + if (emptyTime == Long.MAX_VALUE) { + //discover the first time that the client list is empty. + emptyTime = Time.monotonicNow(); + } + } + + //update renewal time + if (renewal == dfsc.getConf().getHdfsTimeout()/2) { + long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD; + for(DFSClient c : dfsclients) { + final int timeout = c.getConf().getHdfsTimeout(); + if (timeout > 0 && timeout < min) { + min = timeout; + } + } + renewal = min/2; + } + } + + public void interruptAndJoin() throws InterruptedException { + Daemon daemonCopy = null; + synchronized (this) { + if (isRunning()) { + daemon.interrupt(); + daemonCopy = daemon; + } + } + + if (daemonCopy != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("Wait for lease checker to terminate"); + } + daemonCopy.join(); + } + } + + private void renew() throws IOException { + final List<DFSClient> copies; + synchronized(this) { + copies = new ArrayList<DFSClient>(dfsclients); + } + //sort the client names for finding out repeated names. + Collections.sort(copies, new Comparator<DFSClient>() { + @Override + public int compare(final DFSClient left, final DFSClient right) { + return left.getClientName().compareTo(right.getClientName()); + } + }); + String previousName = ""; + for(int i = 0; i < copies.size(); i++) { + final DFSClient c = copies.get(i); + //skip if current client name is the same as the previous name. + if (!c.getClientName().equals(previousName)) { + if (!c.renewLease()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Did not renew lease for client " + + c); + } + continue; + } + previousName = c.getClientName(); + if (LOG.isDebugEnabled()) { + LOG.debug("Lease renewed for client " + previousName); + } + } + } + } + + /** + * Periodically check in with the namenode and renew all the leases + * when the lease period is half over. + */ + private void run(final int id) throws InterruptedException { + for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted(); + Thread.sleep(getSleepPeriod())) { + final long elapsed = Time.monotonicNow() - lastRenewed; + if (elapsed >= getRenewalTime()) { + try { + renew(); + if (LOG.isDebugEnabled()) { + LOG.debug("Lease renewer daemon for " + clientsString() + + " with renew id " + id + " executed"); + } + lastRenewed = Time.monotonicNow(); + } catch (SocketTimeoutException ie) { + LOG.warn("Failed to renew lease for " + clientsString() + " for " + + (elapsed/1000) + " seconds. Aborting ...", ie); + synchronized (this) { + while (!dfsclients.isEmpty()) { + dfsclients.get(0).abort(); + } + } + break; + } catch (IOException ie) { + LOG.warn("Failed to renew lease for " + clientsString() + " for " + + (elapsed/1000) + " seconds. Will retry shortly ...", ie); + } + } + + synchronized(this) { + if (id != currentId || isRenewerExpired()) { + if (LOG.isDebugEnabled()) { + if (id != currentId) { + LOG.debug("Lease renewer daemon for " + clientsString() + + " with renew id " + id + " is not current"); + } else { + LOG.debug("Lease renewer daemon for " + clientsString() + + " with renew id " + id + " expired"); + } + } + //no longer the current daemon or expired + return; + } + + // if no clients are in running state or there is no more clients + // registered with this renewer, stop the daemon after the grace + // period. + if (!clientsRunning() && emptyTime == Long.MAX_VALUE) { + emptyTime = Time.monotonicNow(); + } + } + } + } + + @Override + public String toString() { + String s = getClass().getSimpleName() + ":" + factorykey; + if (LOG.isTraceEnabled()) { + return s + ", clients=" + clientsString() + + ", created at " + instantiationTrace; + } + return s; + } + + /** Get the names of all clients */ + private synchronized String clientsString() { + if (dfsclients.isEmpty()) { + return "[]"; + } else { + final StringBuilder b = new StringBuilder("[").append( + dfsclients.get(0).getClientName()); + for(int i = 1; i < dfsclients.size(); i++) { + b.append(", ").append(dfsclients.get(i).getClientName()); + } + return b.append("]").toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 0689a53..837665e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.mock; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.SocketTimeoutException; @@ -64,6 +65,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; @@ -264,78 +266,84 @@ public class TestDistributedFileSystem { { final DistributedFileSystem dfs = cluster.getFileSystem(); - dfs.dfs.getLeaseRenewer().setGraceSleepPeriod(grace); - assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); + Method setMethod = dfs.dfs.getLeaseRenewer().getClass() + .getDeclaredMethod("setGraceSleepPeriod", long.class); + setMethod.setAccessible(true); + setMethod.invoke(dfs.dfs.getLeaseRenewer(), grace); + Method checkMethod = dfs.dfs.getLeaseRenewer().getClass() + .getDeclaredMethod("isRunning"); + checkMethod.setAccessible(true); + assertFalse((boolean) checkMethod.invoke(dfs.dfs.getLeaseRenewer())); { //create a file final FSDataOutputStream out = dfs.create(filepaths[0]); - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); //write something out.writeLong(millis); - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); //close out.close(); Thread.sleep(grace/4*3); //within grace period - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); for(int i = 0; i < 3; i++) { - if (dfs.dfs.getLeaseRenewer().isRunning()) { + if ((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())) { Thread.sleep(grace/2); } } //passed grace period - assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); + assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); } { //create file1 final FSDataOutputStream out1 = dfs.create(filepaths[1]); - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); //create file2 final FSDataOutputStream out2 = dfs.create(filepaths[2]); - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); //write something to file1 out1.writeLong(millis); - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); //close file1 out1.close(); - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); //write something to file2 out2.writeLong(millis); - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); //close file2 out2.close(); Thread.sleep(grace/4*3); //within grace period - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); } { //create file3 final FSDataOutputStream out3 = dfs.create(filepaths[3]); - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); Thread.sleep(grace/4*3); //passed previous grace period, should still running - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); //write something to file3 out3.writeLong(millis); - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); //close file3 out3.close(); - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); Thread.sleep(grace/4*3); //within grace period - assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); for(int i = 0; i < 3; i++) { - if (dfs.dfs.getLeaseRenewer().isRunning()) { + if ((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())) { Thread.sleep(grace/2); } } //passed grace period - assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); + assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); } dfs.close(); @@ -364,15 +372,18 @@ public class TestDistributedFileSystem { { final DistributedFileSystem dfs = cluster.getFileSystem(); - assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); + Method checkMethod = dfs.dfs.getLeaseRenewer().getClass() + .getDeclaredMethod("isRunning"); + checkMethod.setAccessible(true); + assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); //open and check the file FSDataInputStream in = dfs.open(filepaths[0]); - assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); + assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); assertEquals(millis, in.readLong()); - assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); + assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); in.close(); - assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); + assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())); dfs.close(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java index 1cf7add..9b5a7c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java deleted file mode 100644 index f091db7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.junit.Assert.assertSame; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.Time; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import com.google.common.base.Supplier; - -public class TestLeaseRenewer { - private final String FAKE_AUTHORITY="hdfs://nn1/"; - private final UserGroupInformation FAKE_UGI_A = - UserGroupInformation.createUserForTesting( - "myuser", new String[]{"group1"}); - private final UserGroupInformation FAKE_UGI_B = - UserGroupInformation.createUserForTesting( - "myuser", new String[]{"group1"}); - - private DFSClient MOCK_DFSCLIENT; - private LeaseRenewer renewer; - - /** Cause renewals often so test runs quickly. */ - private static final long FAST_GRACE_PERIOD = 100L; - - @Before - public void setupMocksAndRenewer() throws IOException { - MOCK_DFSCLIENT = createMockClient(); - - renewer = LeaseRenewer.getInstance( - FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT); - renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD); -} - - private DFSClient createMockClient() { - final DfsClientConf mockConf = Mockito.mock(DfsClientConf.class); - Mockito.doReturn((int)FAST_GRACE_PERIOD).when(mockConf).getHdfsTimeout(); - - DFSClient mock = Mockito.mock(DFSClient.class); - Mockito.doReturn(true).when(mock).isClientRunning(); - Mockito.doReturn(mockConf).when(mock).getConf(); - Mockito.doReturn("myclient").when(mock).getClientName(); - return mock; - } - - @Test - public void testInstanceSharing() throws IOException { - // Two lease renewers with the same UGI should return - // the same instance - LeaseRenewer lr = LeaseRenewer.getInstance( - FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT); - LeaseRenewer lr2 = LeaseRenewer.getInstance( - FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT); - Assert.assertSame(lr, lr2); - - // But a different UGI should return a different instance - LeaseRenewer lr3 = LeaseRenewer.getInstance( - FAKE_AUTHORITY, FAKE_UGI_B, MOCK_DFSCLIENT); - Assert.assertNotSame(lr, lr3); - - // A different authority with same UGI should also be a different - // instance. - LeaseRenewer lr4 = LeaseRenewer.getInstance( - "someOtherAuthority", FAKE_UGI_B, MOCK_DFSCLIENT); - Assert.assertNotSame(lr, lr4); - Assert.assertNotSame(lr3, lr4); - } - - @Test - public void testRenewal() throws Exception { - // Keep track of how many times the lease gets renewed - final AtomicInteger leaseRenewalCount = new AtomicInteger(); - Mockito.doAnswer(new Answer<Boolean>() { - @Override - public Boolean answer(InvocationOnMock invocation) throws Throwable { - leaseRenewalCount.incrementAndGet(); - return true; - } - }).when(MOCK_DFSCLIENT).renewLease(); - - - // Set up a file so that we start renewing our lease. - DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class); - long fileId = 123L; - renewer.put(fileId, mockStream, MOCK_DFSCLIENT); - - // Wait for lease to get renewed - long failTime = Time.monotonicNow() + 5000; - while (Time.monotonicNow() < failTime && - leaseRenewalCount.get() == 0) { - Thread.sleep(50); - } - if (leaseRenewalCount.get() == 0) { - Assert.fail("Did not renew lease at all!"); - } - - renewer.closeFile(fileId, MOCK_DFSCLIENT); - } - - /** - * Regression test for HDFS-2810. In this bug, the LeaseRenewer has handles - * to several DFSClients with the same name, the first of which has no files - * open. Previously, this was causing the lease to not get renewed. - */ - @Test - public void testManyDfsClientsWhereSomeNotOpen() throws Exception { - // First DFSClient has no files open so doesn't renew leases. - final DFSClient mockClient1 = createMockClient(); - Mockito.doReturn(false).when(mockClient1).renewLease(); - assertSame(renewer, LeaseRenewer.getInstance( - FAKE_AUTHORITY, FAKE_UGI_A, mockClient1)); - - // Set up a file so that we start renewing our lease. - DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class); - long fileId = 456L; - renewer.put(fileId, mockStream1, mockClient1); - - // Second DFSClient does renew lease - final DFSClient mockClient2 = createMockClient(); - Mockito.doReturn(true).when(mockClient2).renewLease(); - assertSame(renewer, LeaseRenewer.getInstance( - FAKE_AUTHORITY, FAKE_UGI_A, mockClient2)); - - // Set up a file so that we start renewing our lease. - DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class); - renewer.put(fileId, mockStream2, mockClient2); - - - // Wait for lease to get renewed - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - try { - Mockito.verify(mockClient1, Mockito.atLeastOnce()).renewLease(); - Mockito.verify(mockClient2, Mockito.atLeastOnce()).renewLease(); - return true; - } catch (AssertionError err) { - LeaseRenewer.LOG.warn("Not yet satisfied", err); - return false; - } catch (IOException e) { - // should not throw! - throw new RuntimeException(e); - } - } - }, 100, 10000); - - renewer.closeFile(fileId, mockClient1); - renewer.closeFile(fileId, mockClient2); - } - - @Test - public void testThreadName() throws Exception { - DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class); - long fileId = 789L; - Assert.assertFalse("Renewer not initially running", - renewer.isRunning()); - - // Pretend to open a file - renewer.put(fileId, mockStream, MOCK_DFSCLIENT); - - Assert.assertTrue("Renewer should have started running", - renewer.isRunning()); - - // Check the thread name is reasonable - String threadName = renewer.getDaemonName(); - Assert.assertEquals("LeaseRenewer:myuser@hdfs://nn1/", threadName); - - // Pretend to close the file - renewer.closeFile(fileId, MOCK_DFSCLIENT); - renewer.setEmptyTime(Time.monotonicNow()); - - // Should stop the renewer running within a few seconds - long failTime = Time.monotonicNow() + 5000; - while (renewer.isRunning() && Time.monotonicNow() < failTime) { - Thread.sleep(50); - } - Assert.assertFalse(renewer.isRunning()); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d019c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java new file mode 100644 index 0000000..a4e00d9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import static org.junit.Assert.assertSame; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.base.Supplier; + +public class TestLeaseRenewer { + private final String FAKE_AUTHORITY="hdfs://nn1/"; + private final UserGroupInformation FAKE_UGI_A = + UserGroupInformation.createUserForTesting( + "myuser", new String[]{"group1"}); + private final UserGroupInformation FAKE_UGI_B = + UserGroupInformation.createUserForTesting( + "myuser", new String[]{"group1"}); + + private DFSClient MOCK_DFSCLIENT; + private LeaseRenewer renewer; + + /** Cause renewals often so test runs quickly. */ + private static final long FAST_GRACE_PERIOD = 100L; + + @Before + public void setupMocksAndRenewer() throws IOException { + MOCK_DFSCLIENT = createMockClient(); + + renewer = LeaseRenewer.getInstance( + FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT); + renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD); +} + + private DFSClient createMockClient() { + final DfsClientConf mockConf = Mockito.mock(DfsClientConf.class); + Mockito.doReturn((int)FAST_GRACE_PERIOD).when(mockConf).getHdfsTimeout(); + + DFSClient mock = Mockito.mock(DFSClient.class); + Mockito.doReturn(true).when(mock).isClientRunning(); + Mockito.doReturn(mockConf).when(mock).getConf(); + Mockito.doReturn("myclient").when(mock).getClientName(); + return mock; + } + + @Test + public void testInstanceSharing() throws IOException { + // Two lease renewers with the same UGI should return + // the same instance + LeaseRenewer lr = LeaseRenewer.getInstance( + FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT); + LeaseRenewer lr2 = LeaseRenewer.getInstance( + FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT); + Assert.assertSame(lr, lr2); + + // But a different UGI should return a different instance + LeaseRenewer lr3 = LeaseRenewer.getInstance( + FAKE_AUTHORITY, FAKE_UGI_B, MOCK_DFSCLIENT); + Assert.assertNotSame(lr, lr3); + + // A different authority with same UGI should also be a different + // instance. + LeaseRenewer lr4 = LeaseRenewer.getInstance( + "someOtherAuthority", FAKE_UGI_B, MOCK_DFSCLIENT); + Assert.assertNotSame(lr, lr4); + Assert.assertNotSame(lr3, lr4); + } + + @Test + public void testRenewal() throws Exception { + // Keep track of how many times the lease gets renewed + final AtomicInteger leaseRenewalCount = new AtomicInteger(); + Mockito.doAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + leaseRenewalCount.incrementAndGet(); + return true; + } + }).when(MOCK_DFSCLIENT).renewLease(); + + + // Set up a file so that we start renewing our lease. + DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class); + long fileId = 123L; + renewer.put(fileId, mockStream, MOCK_DFSCLIENT); + + // Wait for lease to get renewed + long failTime = Time.monotonicNow() + 5000; + while (Time.monotonicNow() < failTime && + leaseRenewalCount.get() == 0) { + Thread.sleep(50); + } + if (leaseRenewalCount.get() == 0) { + Assert.fail("Did not renew lease at all!"); + } + + renewer.closeFile(fileId, MOCK_DFSCLIENT); + } + + /** + * Regression test for HDFS-2810. In this bug, the LeaseRenewer has handles + * to several DFSClients with the same name, the first of which has no files + * open. Previously, this was causing the lease to not get renewed. + */ + @Test + public void testManyDfsClientsWhereSomeNotOpen() throws Exception { + // First DFSClient has no files open so doesn't renew leases. + final DFSClient mockClient1 = createMockClient(); + Mockito.doReturn(false).when(mockClient1).renewLease(); + assertSame(renewer, LeaseRenewer.getInstance( + FAKE_AUTHORITY, FAKE_UGI_A, mockClient1)); + + // Set up a file so that we start renewing our lease. + DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class); + long fileId = 456L; + renewer.put(fileId, mockStream1, mockClient1); + + // Second DFSClient does renew lease + final DFSClient mockClient2 = createMockClient(); + Mockito.doReturn(true).when(mockClient2).renewLease(); + assertSame(renewer, LeaseRenewer.getInstance( + FAKE_AUTHORITY, FAKE_UGI_A, mockClient2)); + + // Set up a file so that we start renewing our lease. + DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class); + renewer.put(fileId, mockStream2, mockClient2); + + + // Wait for lease to get renewed + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + try { + Mockito.verify(mockClient1, Mockito.atLeastOnce()).renewLease(); + Mockito.verify(mockClient2, Mockito.atLeastOnce()).renewLease(); + return true; + } catch (AssertionError err) { + LeaseRenewer.LOG.warn("Not yet satisfied", err); + return false; + } catch (IOException e) { + // should not throw! + throw new RuntimeException(e); + } + } + }, 100, 10000); + + renewer.closeFile(fileId, mockClient1); + renewer.closeFile(fileId, mockClient2); + } + + @Test + public void testThreadName() throws Exception { + DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class); + long fileId = 789L; + Assert.assertFalse("Renewer not initially running", + renewer.isRunning()); + + // Pretend to open a file + renewer.put(fileId, mockStream, MOCK_DFSCLIENT); + + Assert.assertTrue("Renewer should have started running", + renewer.isRunning()); + + // Check the thread name is reasonable + String threadName = renewer.getDaemonName(); + Assert.assertEquals("LeaseRenewer:myuser@hdfs://nn1/", threadName); + + // Pretend to close the file + renewer.closeFile(fileId, MOCK_DFSCLIENT); + renewer.setEmptyTime(Time.monotonicNow()); + + // Should stop the renewer running within a few seconds + long failTime = Time.monotonicNow() + 5000; + while (renewer.isRunning() && Time.monotonicNow() < failTime) { + Thread.sleep(50); + } + Assert.assertFalse(renewer.isRunning()); + } + +}
