Author: cmccabe Date: Wed Dec 4 20:06:25 2013 New Revision: 1547895 URL: http://svn.apache.org/r1547895 Log: HDFS-5555. CacheAdmin commands fail when first listed NameNode is in Standby (jxiang via cmccabe)
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1547895&r1=1547894&r2=1547895&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Dec 4 20:06:25 2013 @@ -421,6 +421,9 @@ Trunk (Unreleased) HDFS-5562. TestCacheDirectives and TestFsDatasetCache should stub out native mlock. (Colin McCabe and Akira Ajisaka via wang) + HDFS-5555. CacheAdmin commands fail when first listed NameNode is in + Standby (jxiang via cmccabe) + Release 2.4.0 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1547895&r1=1547894&r2=1547895&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Wed Dec 4 20:06:25 2013 @@ -109,8 +109,10 @@ import org.apache.hadoop.hdfs.client.Cli import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolIterator; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; @@ -2324,12 +2326,7 @@ public class DFSClient implements java.i public RemoteIterator<CacheDirectiveEntry> listCacheDirectives( CacheDirectiveInfo filter) throws IOException { - checkOpen(); - try { - return namenode.listCacheDirectives(0, filter); - } catch (RemoteException re) { - throw re.unwrapRemoteException(); - } + return new CacheDirectiveIterator(namenode, filter); } public void addCachePool(CachePoolInfo info) throws IOException { @@ -2360,12 +2357,7 @@ public class DFSClient implements java.i } public RemoteIterator<CachePoolEntry> listCachePools() throws IOException { - checkOpen(); - try { - return namenode.listCachePools(""); - } catch (RemoteException re) { - throw re.unwrapRemoteException(); - } + return new CachePoolIterator(namenode); } /** Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java?rev=1547895&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java Wed Dec 4 20:06:25 2013 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.BatchedRemoteIterator; + +/** + * CacheDirectiveIterator is a remote iterator that iterates cache directives. + * It supports retrying in case of namenode failover. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class CacheDirectiveIterator + extends BatchedRemoteIterator<Long, CacheDirectiveEntry> { + + private final CacheDirectiveInfo filter; + private final ClientProtocol namenode; + + public CacheDirectiveIterator(ClientProtocol namenode, + CacheDirectiveInfo filter) { + super(Long.valueOf(0)); + this.namenode = namenode; + this.filter = filter; + } + + @Override + public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey) + throws IOException { + return namenode.listCacheDirectives(prevKey, filter); + } + + @Override + public Long elementToPrevKey(CacheDirectiveEntry entry) { + return entry.getInfo().getId(); + } +} Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java?rev=1547895&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java Wed Dec 4 20:06:25 2013 @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.BatchedRemoteIterator; + +/** + * CachePoolIterator is a remote iterator that iterates cache pools. + * It supports retrying in case of namenode failover. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class CachePoolIterator + extends BatchedRemoteIterator<String, CachePoolEntry> { + + private final ClientProtocol namenode; + + public CachePoolIterator(ClientProtocol namenode) { + super(""); + this.namenode = namenode; + } + + @Override + public BatchedEntries<CachePoolEntry> makeRequest(String prevKey) + throws IOException { + return namenode.listCachePools(prevKey); + } + + @Override + public String elementToPrevKey(CachePoolEntry entry) { + return entry.getInfo().getPoolName(); + } +} Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1547895&r1=1547894&r2=1547895&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Dec 4 20:06:25 2013 @@ -28,9 +28,9 @@ import org.apache.hadoop.fs.FileAlreadyE import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -1134,10 +1134,10 @@ public interface ClientProtocol { * listCacheDirectives. * @param filter Parameters to use to filter the list results, * or null to display all directives visible to us. - * @return A RemoteIterator which returns CacheDirectiveInfo objects. + * @return A batch of CacheDirectiveEntry objects. */ @Idempotent - public RemoteIterator<CacheDirectiveEntry> listCacheDirectives( + public BatchedEntries<CacheDirectiveEntry> listCacheDirectives( long prevId, CacheDirectiveInfo filter) throws IOException; /** @@ -1175,9 +1175,9 @@ public interface ClientProtocol { * * @param prevPool name of the last pool listed, or the empty string if this is * the first invocation of listCachePools - * @return A RemoteIterator which returns CachePool objects. + * @return A batch of CachePoolEntry objects. */ @Idempotent - public RemoteIterator<CachePoolEntry> listCachePools(String prevPool) + public BatchedEntries<CachePoolEntry> listCachePools(String prevPool) throws IOException; } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1547895&r1=1547894&r2=1547895&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Wed Dec 4 20:06:25 2013 @@ -24,12 +24,9 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.Options.Rename; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; -import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; @@ -52,8 +49,6 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto; @@ -109,7 +104,6 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; @@ -176,9 +170,7 @@ import org.apache.hadoop.security.proto. import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto; import org.apache.hadoop.security.token.Token; -import org.apache.commons.lang.StringUtils; -import com.google.common.primitives.Shorts; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -1079,21 +1071,13 @@ public class ClientNamenodeProtocolServe try { CacheDirectiveInfo filter = PBHelper.convert(request.getFilter()); - RemoteIterator<CacheDirectiveEntry> iter = - server.listCacheDirectives(request.getPrevId(), filter); + BatchedEntries<CacheDirectiveEntry> entries = + server.listCacheDirectives(request.getPrevId(), filter); ListCacheDirectivesResponseProto.Builder builder = ListCacheDirectivesResponseProto.newBuilder(); - long prevId = 0; - while (iter.hasNext()) { - CacheDirectiveEntry entry = iter.next(); - builder.addElements(PBHelper.convert(entry)); - prevId = entry.getInfo().getId(); - } - if (prevId == 0) { - builder.setHasMore(false); - } else { - iter = server.listCacheDirectives(prevId, filter); - builder.setHasMore(iter.hasNext()); + builder.setHasMore(entries.hasMore()); + for (int i=0, n=entries.size(); i<n; i++) { + builder.addElements(PBHelper.convert(entries.get(i))); } return builder.build(); } catch (IOException e) { @@ -1138,22 +1122,13 @@ public class ClientNamenodeProtocolServe public ListCachePoolsResponseProto listCachePools(RpcController controller, ListCachePoolsRequestProto request) throws ServiceException { try { - RemoteIterator<CachePoolEntry> iter = + BatchedEntries<CachePoolEntry> entries = server.listCachePools(request.getPrevPoolName()); ListCachePoolsResponseProto.Builder responseBuilder = ListCachePoolsResponseProto.newBuilder(); - String prevPoolName = null; - while (iter.hasNext()) { - CachePoolEntry entry = iter.next(); - responseBuilder.addEntries(PBHelper.convert(entry)); - prevPoolName = entry.getInfo().getPoolName(); - } - // fill in hasNext - if (prevPoolName == null) { - responseBuilder.setHasMore(false); - } else { - iter = server.listCachePools(prevPoolName); - responseBuilder.setHasMore(iter.hasNext()); + responseBuilder.setHasMore(entries.hasMore()); + for (int i=0, n=entries.size(); i<n; i++) { + responseBuilder.addEntries(PBHelper.convert(entries.get(i))); } return responseBuilder.build(); } catch (IOException e) { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1547895&r1=1547894&r2=1547895&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Wed Dec 4 20:06:25 2013 @@ -24,7 +24,6 @@ import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.BatchedRemoteIterator; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -32,7 +31,6 @@ import org.apache.hadoop.fs.FileAlreadyE import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; @@ -1062,46 +1060,23 @@ public class ClientNamenodeProtocolTrans } } - private class CacheEntriesIterator - extends BatchedRemoteIterator<Long, CacheDirectiveEntry> { - private final CacheDirectiveInfo filter; - - public CacheEntriesIterator(long prevKey, - CacheDirectiveInfo filter) { - super(prevKey); - this.filter = filter; - } - - @Override - public BatchedEntries<CacheDirectiveEntry> makeRequest( - Long nextKey) throws IOException { - ListCacheDirectivesResponseProto response; - try { - response = rpcProxy.listCacheDirectives(null, - ListCacheDirectivesRequestProto.newBuilder(). - setPrevId(nextKey). - setFilter(PBHelper.convert(filter)). - build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - return new BatchedCacheEntries(response); - } - - @Override - public Long elementToPrevKey(CacheDirectiveEntry element) { - return element.getInfo().getId(); - } - } - @Override - public RemoteIterator<CacheDirectiveEntry> + public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId, CacheDirectiveInfo filter) throws IOException { if (filter == null) { filter = new CacheDirectiveInfo.Builder().build(); } - return new CacheEntriesIterator(prevId, filter); + try { + return new BatchedCacheEntries( + rpcProxy.listCacheDirectives(null, + ListCacheDirectivesRequestProto.newBuilder(). + setPrevId(prevId). + setFilter(PBHelper.convert(filter)). + build())); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } } @Override @@ -1164,35 +1139,16 @@ public class ClientNamenodeProtocolTrans } } - private class CachePoolIterator - extends BatchedRemoteIterator<String, CachePoolEntry> { - - public CachePoolIterator(String prevKey) { - super(prevKey); - } - - @Override - public BatchedEntries<CachePoolEntry> makeRequest(String prevKey) - throws IOException { - try { - return new BatchedCachePoolEntries( - rpcProxy.listCachePools(null, - ListCachePoolsRequestProto.newBuilder(). - setPrevPoolName(prevKey).build())); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - @Override - public String elementToPrevKey(CachePoolEntry entry) { - return entry.getInfo().getPoolName(); - } - } - @Override - public RemoteIterator<CachePoolEntry> listCachePools(String prevKey) + public BatchedEntries<CachePoolEntry> listCachePools(String prevKey) throws IOException { - return new CachePoolIterator(prevKey); + try { + return new BatchedCachePoolEntries( + rpcProxy.listCachePools(null, + ListCachePoolsRequestProto.newBuilder(). + setPrevPoolName(prevKey).build())); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1547895&r1=1547894&r2=1547895&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Wed Dec 4 20:06:25 2013 @@ -36,7 +36,6 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BatchedRemoteIterator; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -46,8 +45,8 @@ import org.apache.hadoop.fs.InvalidPathE import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.ha.HAServiceStatus; @@ -1251,36 +1250,13 @@ class NameNodeRpcServer implements Namen namesystem.removeCacheDirective(id); } - private class ServerSideCacheEntriesIterator - extends BatchedRemoteIterator<Long, CacheDirectiveEntry> { - - private final CacheDirectiveInfo filter; - - public ServerSideCacheEntriesIterator (Long firstKey, - CacheDirectiveInfo filter) { - super(firstKey); - this.filter = filter; - } - - @Override - public BatchedEntries<CacheDirectiveEntry> makeRequest( - Long nextKey) throws IOException { - return namesystem.listCacheDirectives(nextKey, filter); - } - - @Override - public Long elementToPrevKey(CacheDirectiveEntry entry) { - return entry.getInfo().getId(); - } - } - @Override - public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(long prevId, + public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId, CacheDirectiveInfo filter) throws IOException { if (filter == null) { filter = new CacheDirectiveInfo.Builder().build(); } - return new ServerSideCacheEntriesIterator(prevId, filter); + return namesystem.listCacheDirectives(prevId, filter); } @Override @@ -1298,28 +1274,9 @@ class NameNodeRpcServer implements Namen namesystem.removeCachePool(cachePoolName); } - private class ServerSideCachePoolIterator - extends BatchedRemoteIterator<String, CachePoolEntry> { - - public ServerSideCachePoolIterator(String prevKey) { - super(prevKey); - } - - @Override - public BatchedEntries<CachePoolEntry> makeRequest(String prevKey) - throws IOException { - return namesystem.listCachePools(prevKey); - } - - @Override - public String elementToPrevKey(CachePoolEntry entry) { - return entry.getInfo().getPoolName(); - } - } - @Override - public RemoteIterator<CachePoolEntry> listCachePools(String prevKey) + public BatchedEntries<CachePoolEntry> listCachePools(String prevKey) throws IOException { - return new ServerSideCachePoolIterator(prevKey); + return namesystem.listCachePools(prevKey != null ? prevKey : ""); } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java?rev=1547895&r1=1547894&r2=1547895&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java Wed Dec 4 20:06:25 2013 @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.HdfsConfig import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator; import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; @@ -763,7 +764,7 @@ public class TestCacheDirectives { } // Uncache and check each path in sequence RemoteIterator<CacheDirectiveEntry> entries = - nnRpc.listCacheDirectives(0, null); + new CacheDirectiveIterator(nnRpc, null); for (int i=0; i<numFiles; i++) { CacheDirectiveEntry entry = entries.next(); nnRpc.removeCacheDirective(entry.getInfo().getId()); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java?rev=1547895&r1=1547894&r2=1547895&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java Wed Dec 4 20:06:25 2013 @@ -29,6 +29,7 @@ import java.net.URI; import java.net.UnknownHostException; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Random; @@ -86,6 +87,7 @@ public class TestRetryCacheWithHA { private static final int BlockSize = 1024; private static final short DataNodes = 3; private static final int CHECKTIMES = 10; + private static final int ResponseSize = 3; private MiniDFSCluster cluster; private DistributedFileSystem dfs; @@ -120,6 +122,8 @@ public class TestRetryCacheWithHA { @Before public void setup() throws Exception { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, ResponseSize); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, ResponseSize); cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(DataNodes).build(); @@ -1175,4 +1179,92 @@ public class TestRetryCacheWithHA { + results.get(op.name)); } } + + /** + * Add a list of cache pools, list cache pools, + * switch active NN, and list cache pools again. + */ + @Test (timeout=60000) + public void testListCachePools() throws Exception { + final int poolCount = 7; + HashSet<String> poolNames = new HashSet<String>(poolCount); + for (int i=0; i<poolCount; i++) { + String poolName = "testListCachePools-" + i; + dfs.addCachePool(new CachePoolInfo(poolName)); + poolNames.add(poolName); + } + listCachePools(poolNames, 0); + + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + cluster.waitActive(1); + listCachePools(poolNames, 1); + } + + /** + * Add a list of cache directives, list cache directives, + * switch active NN, and list cache directives again. + */ + @Test (timeout=60000) + public void testListCacheDirectives() throws Exception { + final int poolCount = 7; + HashSet<String> poolNames = new HashSet<String>(poolCount); + Path path = new Path("/p"); + for (int i=0; i<poolCount; i++) { + String poolName = "testListCacheDirectives-" + i; + CacheDirectiveInfo directiveInfo = + new CacheDirectiveInfo.Builder().setPool(poolName).setPath(path).build(); + dfs.addCachePool(new CachePoolInfo(poolName)); + dfs.addCacheDirective(directiveInfo); + poolNames.add(poolName); + } + listCacheDirectives(poolNames, 0); + + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + cluster.waitActive(1); + listCacheDirectives(poolNames, 1); + } + + @SuppressWarnings("unchecked") + private void listCachePools( + HashSet<String> poolNames, int active) throws Exception { + HashSet<String> tmpNames = (HashSet<String>)poolNames.clone(); + RemoteIterator<CachePoolEntry> pools = dfs.listCachePools(); + int poolCount = poolNames.size(); + for (int i=0; i<poolCount; i++) { + CachePoolEntry pool = pools.next(); + String pollName = pool.getInfo().getPoolName(); + assertTrue("The pool name should be expected", tmpNames.remove(pollName)); + if (i % 2 == 0) { + int standby = active; + active = (standby == 0) ? 1 : 0; + cluster.transitionToStandby(standby); + cluster.transitionToActive(active); + cluster.waitActive(active); + } + } + assertTrue("All pools must be found", tmpNames.isEmpty()); + } + + @SuppressWarnings("unchecked") + private void listCacheDirectives( + HashSet<String> poolNames, int active) throws Exception { + HashSet<String> tmpNames = (HashSet<String>)poolNames.clone(); + RemoteIterator<CacheDirectiveEntry> directives = dfs.listCacheDirectives(null); + int poolCount = poolNames.size(); + for (int i=0; i<poolCount; i++) { + CacheDirectiveEntry directive = directives.next(); + String pollName = directive.getInfo().getPool(); + assertTrue("The pool name should be expected", tmpNames.remove(pollName)); + if (i % 2 == 0) { + int standby = active; + active = (standby == 0) ? 1 : 0; + cluster.transitionToStandby(standby); + cluster.transitionToActive(active); + cluster.waitActive(active); + } + } + assertTrue("All pools must be found", tmpNames.isEmpty()); + } }