Author: jing9 Date: Tue Jan 21 01:17:35 2014 New Revision: 1559874 URL: http://svn.apache.org/r1559874 Log: HDFS-5774. Serialize CachePool directives in protobuf. Contributed by Haohui Mai.
Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt?rev=1559874&r1=1559873&r2=1559874&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt (original) +++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt Tue Jan 21 01:17:35 2014 @@ -14,3 +14,5 @@ HDFS-5698 subtasks jing9) HDFS-5743. Use protobuf to serialize snapshot information. (jing9) + + HDFS-5774. Serialize CachePool directives in protobuf. (Haohui Mai via jing9) Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1559874&r1=1559873&r2=1559874&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original) +++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Tue Jan 21 01:17:35 2014 @@ -50,8 +50,10 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.InvalidRequestException; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.CacheDirective; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -62,11 +64,15 @@ import org.apache.hadoop.hdfs.protocol.C import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; +import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; @@ -81,6 +87,7 @@ import org.apache.hadoop.util.LightWeigh import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; /** * The Cache Manager handles caching on DataNodes. @@ -167,6 +174,19 @@ public final class CacheManager { */ private CacheReplicationMonitor monitor; + public static final class PersistState { + public final CacheManagerSection section; + public final List<CachePoolInfoProto> pools; + public final List<CacheDirectiveInfoProto> directives; + + public PersistState(CacheManagerSection section, + List<CachePoolInfoProto> pools, List<CacheDirectiveInfoProto> directives) { + this.section = section; + this.pools = pools; + this.directives = directives; + } + } + CacheManager(FSNamesystem namesystem, Configuration conf, BlockManager blockManager) { this.namesystem = namesystem; @@ -933,6 +953,57 @@ public final class CacheManager { serializerCompat.save(out, sdPath); } + public PersistState saveState() throws IOException { + ArrayList<CachePoolInfoProto> pools = Lists + .newArrayListWithCapacity(cachePools.size()); + ArrayList<CacheDirectiveInfoProto> directives = Lists + .newArrayListWithCapacity(directivesById.size()); + + for (CachePool pool : cachePools.values()) { + CachePoolInfo p = pool.getInfo(true); + CachePoolInfoProto.Builder b = CachePoolInfoProto.newBuilder(); + + if (p.getOwnerName() != null) + b.setOwnerName(p.getOwnerName()); + + if (p.getGroupName() != null) + b.setGroupName(p.getGroupName()); + + if (p.getMode() != null) + b.setMode(p.getMode().toShort()); + + if (p.getLimit() != null) + b.setLimit(p.getLimit()); + + pools.add(b.build()); + } + + for (CacheDirective directive : directivesById.values()) { + CacheDirectiveInfo info = directive.toInfo(); + CacheDirectiveInfoProto.Builder b = CacheDirectiveInfoProto.newBuilder(); + + if (info.getPath() != null) + b.setPath(info.getPath().toUri().getPath()); + + if (info.getReplication() != null) + b.setReplication(info.getReplication()); + + if (info.getPool() != null) + b.setPool(info.getPool()); + + if (info.getExpiration() != null) + b.setExpiration(CacheDirectiveInfoExpirationProto.newBuilder() + .setMillis(info.getExpiration().getMillis())); + + directives.add(b.build()); + } + CacheManagerSection s = CacheManagerSection.newBuilder() + .setNextDirectiveId(nextDirectiveId).setNumPools(pools.size()) + .setNumDirectives(directives.size()).build(); + + return new PersistState(s, pools, directives); + } + /** * Reloads CacheManager state from the passed DataInput. Used during namenode * startup to restore CacheManager state from an FSImage. @@ -943,6 +1014,56 @@ public final class CacheManager { serializerCompat.load(in); } + public void loadState(PersistState s) throws IOException { + nextDirectiveId = s.section.getNextDirectiveId(); + for (CachePoolInfoProto p : s.pools) { + CachePoolInfo info = new CachePoolInfo(p.getPoolName()); + if (p.hasOwnerName()) + info.setOwnerName(p.getOwnerName()); + + if (p.hasGroupName()) + info.setGroupName(p.getGroupName()); + + if (p.hasMode()) + info.setMode(new FsPermission((short) p.getMode())); + + if (p.hasLimit()) + info.setLimit(p.getLimit()); + + addCachePool(info); + } + + for (CacheDirectiveInfoProto p : s.directives) { + // Get pool reference by looking it up in the map + final String poolName = p.getPool(); + CacheDirective directive = new CacheDirective(p.getId(), new Path( + p.getPath()).toUri().getPath(), (short) p.getReplication(), p + .getExpiration().getMillis()); + addCacheDirective(poolName, directive); + } + } + + private void addCacheDirective(final String poolName, + final CacheDirective directive) throws IOException { + CachePool pool = cachePools.get(poolName); + if (pool == null) { + throw new IOException("Directive refers to pool " + poolName + + ", which does not exist."); + } + boolean addedDirective = pool.getDirectiveList().add(directive); + assert addedDirective; + if (directivesById.put(directive.getId(), directive) != null) { + throw new IOException("A directive with ID " + directive.getId() + + " already exists"); + } + List<CacheDirective> directives = directivesByPath.get(directive.getPath()); + if (directives == null) { + directives = new LinkedList<CacheDirective>(); + directivesByPath.put(directive.getPath(), directives); + } + directives.add(directive); + } + private final class SerializerCompat { private void save(DataOutputStream out, String sdPath) throws IOException { out.writeLong(nextDirectiveId); @@ -1025,27 +1146,10 @@ public final class CacheManager { CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in); // Get pool reference by looking it up in the map final String poolName = info.getPool(); - CachePool pool = cachePools.get(poolName); - if (pool == null) { - throw new IOException("Directive refers to pool " + poolName + - ", which does not exist."); - } CacheDirective directive = new CacheDirective(info.getId(), info.getPath().toUri().getPath(), info.getReplication(), info.getExpiration().getAbsoluteMillis()); - boolean addedDirective = pool.getDirectiveList().add(directive); - assert addedDirective; - if (directivesById.put(directive.getId(), directive) != null) { - throw new IOException("A directive with ID " + directive.getId() + - " already exists"); - } - List<CacheDirective> directives = - directivesByPath.get(directive.getPath()); - if (directives == null) { - directives = new LinkedList<CacheDirective>(); - directivesByPath.put(directive.getPath(), directives); - } - directives.add(directive); + addCacheDirective(poolName, directive); counter.increment(); } prog.endStep(Phase.LOADING_FSIMAGE, step); Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java?rev=1559874&r1=1559873&r2=1559874&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java (original) +++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java Tue Jan 21 01:17:35 2014 @@ -46,6 +46,10 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; +import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection; @@ -220,6 +224,9 @@ public final class FSImageFormatProtobuf case SNAPSHOT_DIFF: snapshotLoader.loadSnapshotDiffSection(in); break; + case CACHE_MANAGER: + loadCacheManagerSection(in); + break; default: LOG.warn("Unregconized section " + n); break; @@ -246,6 +253,21 @@ public final class FSImageFormatProtobuf stringTable[e.getId()] = e.getStr(); } } + + private void loadCacheManagerSection(InputStream in) throws IOException { + CacheManagerSection s = CacheManagerSection.parseDelimitedFrom(in); + ArrayList<CachePoolInfoProto> pools = Lists.newArrayListWithCapacity(s + .getNumPools()); + ArrayList<CacheDirectiveInfoProto> directives = Lists + .newArrayListWithCapacity(s.getNumDirectives()); + for (int i = 0; i < s.getNumPools(); ++i) + pools.add(CachePoolInfoProto.parseDelimitedFrom(in)); + for (int i = 0; i < s.getNumDirectives(); ++i) + directives.add(CacheDirectiveInfoProto.parseDelimitedFrom(in)); + fsn.getCacheManager().loadState( + new CacheManager.PersistState(s, pools, directives)); + } + } public static final class Saver { @@ -352,6 +374,8 @@ public final class FSImageFormatProtobuf saveSnapshots(b); saveStringTableSection(b); + saveCacheManagerSection(b); + // Flush the buffered data into the file before appending the header flushSectionOutputStream(); @@ -361,6 +385,20 @@ public final class FSImageFormatProtobuf savedDigest = new MD5Hash(digester.digest()); } + private void saveCacheManagerSection(FileSummary.Builder summary) throws IOException { + final FSNamesystem fsn = context.getSourceNamesystem(); + CacheManager.PersistState state = fsn.getCacheManager().saveState(); + state.section.writeDelimitedTo(sectionOutputStream); + + for (CachePoolInfoProto p : state.pools) + p.writeDelimitedTo(sectionOutputStream); + + for (CacheDirectiveInfoProto p : state.directives) + p.writeDelimitedTo(sectionOutputStream); + + commitSection(summary, SectionName.CACHE_MANAGER); + } + private void saveNameSystemSection( FileSummary.Builder summary) throws IOException { final FSNamesystem fsn = context.getSourceNamesystem(); @@ -443,7 +481,6 @@ public final class FSImageFormatProtobuf INODE_DIR("INODE_DIR"), FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"), SNAPSHOT_DIFF("SNAPSHOT_DIFF"), - SECRET_MANAGER("SECRET_MANAGER"), CACHE_MANAGER("CACHE_MANAGER"); private static final SectionName[] values = SectionName.values(); Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto?rev=1559874&r1=1559873&r2=1559874&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto (original) +++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto Tue Jan 21 01:17:35 2014 @@ -73,7 +73,7 @@ message NameSystemSection { } /** - * Permission is serialized as a 64-bit long. [0:24):[25:48):[48:64) (in Big Endian). + * Permission is serialized as a 64-bit long. [0:24):[25:48):[48:64) (in Big Endian). * The first and the second parts are the string ids of the user and * group name, and the last 16 bits are the permission bits. * @@ -113,7 +113,7 @@ message INodeSection { } message INodeReference { - // id of the referred inode + // id of the referred inode optional uint64 referredId = 1; // local name recorded in WithName optional bytes name = 2; @@ -132,7 +132,7 @@ message INodeSection { required Type type = 1; required uint64 id = 2; optional bytes name = 3; - + optional INodeFile file = 4; optional INodeDirectory directory = 5; optional INodeSymlink symlink = 6; @@ -205,7 +205,7 @@ message SnapshotDiffSection { optional INodeSection.INodeDirectory snapshotCopy = 5; optional uint32 clistSize = 6; optional uint32 dlistSize = 7; - repeated uint64 deletedINode = 8; // id of deleted inode + repeated uint64 deletedINode = 8; // id of deleted inode // repeated CreatedListEntry // repeated INodeReference (number of ref: dlistSize - dlist.size) } @@ -238,3 +238,11 @@ message StringTableSection { optional uint32 numEntry = 1; // repeated Entry } + +message CacheManagerSection { + required uint64 nextDirectiveId = 1; + required uint32 numPools = 2; + required uint32 numDirectives = 3; + // repeated CachePoolInfoProto pools + // repeated CacheDirectiveInfoProto directives +}