Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Tue Aug 19 23:49:39 2014 @@ -37,8 +37,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; @@ -47,6 +45,8 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.nativeio.NativeIO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2) @@ -101,7 +101,8 @@ public class FsDatasetCache { } } - private static final Log LOG = LogFactory.getLog(FsDatasetCache.class); + private static final Logger LOG = LoggerFactory.getLogger(FsDatasetCache + .class); /** * Stores MappableBlock objects and the states they're in. @@ -245,21 +246,17 @@ public class FsDatasetCache { ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); Value prevValue = mappableBlockMap.get(key); if (prevValue != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Block with id " + blockId + ", pool " + bpid + - " already exists in the FsDatasetCache with state " + - prevValue.state); - } + LOG.debug("Block with id {}, pool {} already exists in the " + + "FsDatasetCache with state {}", blockId, bpid, prevValue.state + ); numBlocksFailedToCache.incrementAndGet(); return; } mappableBlockMap.put(key, new Value(null, State.CACHING)); volumeExecutor.execute( new CachingTask(key, blockFileName, length, genstamp)); - if (LOG.isDebugEnabled()) { - LOG.debug("Initiating caching for Block with id " + blockId + - ", pool " + bpid); - } + LOG.debug("Initiating caching for Block with id {}, pool {}", blockId, + bpid); } synchronized void uncacheBlock(String bpid, long blockId) { @@ -270,44 +267,34 @@ public class FsDatasetCache { processBlockMunlockRequest(key)) { // TODO: we probably want to forcibly uncache the block (and close the // shm) after a certain timeout has elapsed. - if (LOG.isDebugEnabled()) { - LOG.debug(key + " is anchored, and can't be uncached now."); - } + LOG.debug("{} is anchored, and can't be uncached now.", key); return; } if (prevValue == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Block with id " + blockId + ", pool " + bpid + " " + - "does not need to be uncached, because it is not currently " + - "in the mappableBlockMap."); - } + LOG.debug("Block with id {}, pool {} does not need to be uncached, " + + "because it is not currently in the mappableBlockMap.", blockId, + bpid); numBlocksFailedToUncache.incrementAndGet(); return; } switch (prevValue.state) { case CACHING: - if (LOG.isDebugEnabled()) { - LOG.debug("Cancelling caching for block with id " + blockId + - ", pool " + bpid + "."); - } + LOG.debug("Cancelling caching for block with id {}, pool {}.", blockId, + bpid); mappableBlockMap.put(key, new Value(prevValue.mappableBlock, State.CACHING_CANCELLED)); break; case CACHED: - if (LOG.isDebugEnabled()) { - LOG.debug("Block with id " + blockId + ", pool " + bpid + " " + - "has been scheduled for uncaching."); - } + LOG.debug( + "Block with id {}, pool {} has been scheduled for uncaching" + ".", + blockId, bpid); mappableBlockMap.put(key, new Value(prevValue.mappableBlock, State.UNCACHING)); uncachingExecutor.execute(new UncachingTask(key)); break; default: - if (LOG.isDebugEnabled()) { - LOG.debug("Block with id " + blockId + ", pool " + bpid + " " + - "does not need to be uncached, because it is " + - "in state " + prevValue.state + "."); - } + LOG.debug("Block with id {}, pool {} does not need to be uncached, " + + "because it is in state {}.", blockId, bpid, prevValue.state); numBlocksFailedToUncache.incrementAndGet(); break; } @@ -386,10 +373,8 @@ public class FsDatasetCache { } mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED)); } - if (LOG.isDebugEnabled()) { - LOG.debug("Successfully cached " + key + ". We are now caching " + - newUsedBytes + " bytes in total."); - } + LOG.debug("Successfully cached {}. We are now caching {} bytes in" + + " total.", key, newUsedBytes); dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(key); numBlocksCached.addAndGet(1); dataset.datanode.getMetrics().incrBlocksCached(1); @@ -399,12 +384,10 @@ public class FsDatasetCache { IOUtils.closeQuietly(metaIn); if (!success) { if (reservedBytes) { - newUsedBytes = usedBytesCount.release(length); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Caching of " + key + " was aborted. We are now " + - "caching only " + newUsedBytes + " + bytes in total."); + usedBytesCount.release(length); } + LOG.debug("Caching of {} was aborted. We are now caching only {} " + + "bytes in total.", key, usedBytesCount.get()); if (mappableBlock != null) { mappableBlock.close(); } @@ -444,10 +427,7 @@ public class FsDatasetCache { usedBytesCount.release(value.mappableBlock.getLength()); numBlocksCached.addAndGet(-1); dataset.datanode.getMetrics().incrBlocksUncached(1); - if (LOG.isDebugEnabled()) { - LOG.debug("Uncaching of " + key + " completed. " + - "usedBytes = " + newUsedBytes); - } + LOG.debug("Uncaching of {} completed. usedBytes = {}", key, newUsedBytes); } }
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Tue Aug 19 23:49:39 2014 @@ -17,19 +17,67 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import java.io.File; +import java.io.FileDescriptor; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.StorageType; -import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.Storage; -import org.apache.hadoop.hdfs.server.datanode.*; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.*; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; +import org.apache.hadoop.hdfs.server.datanode.Replica; +import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; +import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; +import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; +import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -43,15 +91,6 @@ import org.apache.hadoop.util.DiskChecke import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; -import javax.management.StandardMBean; -import java.io.*; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.*; -import java.util.concurrent.Executor; - /************************************************** * FSDataset manages a set of data blocks. Each block * has a unique name and an extent on disk. @@ -120,10 +159,8 @@ class FsDatasetImpl implements FsDataset /** - * Returns a clone of a replica stored in data-node memory. - * Should be primarily used for testing. - * @param blockId - * @return + * This should be primarily used for testing. + * @return clone of replica store in datanode memory */ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { ReplicaInfo r = volumeMap.get(bpid, blockId); @@ -165,6 +202,7 @@ class FsDatasetImpl implements FsDataset final Map<String, DatanodeStorage> storageMap; final FsDatasetAsyncDiskService asyncDiskService; final FsDatasetCache cacheManager; + private final Configuration conf; private final int validVolsRequired; final ReplicaMap volumeMap; @@ -179,6 +217,7 @@ class FsDatasetImpl implements FsDataset ) throws IOException { this.datanode = datanode; this.dataStorage = storage; + this.conf = conf; // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. final int volFailuresTolerated = @@ -205,38 +244,76 @@ class FsDatasetImpl implements FsDataset } storageMap = new HashMap<String, DatanodeStorage>(); - final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>( - storage.getNumStorageDirs()); - for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { - Storage.StorageDirectory sd = storage.getStorageDir(idx); - final File dir = sd.getCurrentDir(); - final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot()); - volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf, - storageType)); - LOG.info("Added volume - " + dir + ", StorageType: " + storageType); - storageMap.put(sd.getStorageUuid(), - new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType)); - } volumeMap = new ReplicaMap(this); - @SuppressWarnings("unchecked") final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); - volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl); - volumes.initializeReplicaMaps(volumeMap); + volumes = new FsVolumeList(volsFailed, blockChooserImpl); + asyncDiskService = new FsDatasetAsyncDiskService(datanode); - File[] roots = new File[storage.getNumStorageDirs()]; for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { - roots[idx] = storage.getStorageDir(idx).getCurrentDir(); + addVolume(dataLocations, storage.getStorageDir(idx)); } - asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots); + cacheManager = new FsDatasetCache(this); registerMBean(datanode.getDatanodeUuid()); } + private void addVolume(Collection<StorageLocation> dataLocations, + Storage.StorageDirectory sd) throws IOException { + final File dir = sd.getCurrentDir(); + final StorageType storageType = + getStorageTypeFromLocations(dataLocations, sd.getRoot()); + + // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is + // nothing needed to be rolled back to make various data structures, e.g., + // storageMap and asyncDiskService, consistent. + FsVolumeImpl fsVolume = new FsVolumeImpl( + this, sd.getStorageUuid(), dir, this.conf, storageType); + fsVolume.getVolumeMap(volumeMap); + + volumes.addVolume(fsVolume); + storageMap.put(sd.getStorageUuid(), + new DatanodeStorage(sd.getStorageUuid(), + DatanodeStorage.State.NORMAL, + storageType)); + asyncDiskService.addVolume(sd.getCurrentDir()); + + LOG.info("Added volume - " + dir + ", StorageType: " + storageType); + } + + /** + * Add an array of StorageLocation to FsDataset. + * + * @pre dataStorage must have these volumes. + * @param volumes + * @throws IOException + */ + @Override + public synchronized void addVolumes(Collection<StorageLocation> volumes) + throws IOException { + final Collection<StorageLocation> dataLocations = + DataNode.getStorageLocations(this.conf); + Map<String, Storage.StorageDirectory> allStorageDirs = + new HashMap<String, Storage.StorageDirectory>(); + for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { + Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); + allStorageDirs.put(sd.getRoot().getAbsolutePath(), sd); + } + + for (StorageLocation vol : volumes) { + String key = vol.getFile().getAbsolutePath(); + if (!allStorageDirs.containsKey(key)) { + LOG.warn("Attempt to add an invalid volume: " + vol.getFile()); + } else { + addVolume(dataLocations, allStorageDirs.get(key)); + } + } + } + private StorageType getStorageTypeFromLocations( Collection<StorageLocation> dataLocations, File dir) { for (StorageLocation dataLocation : dataLocations) { @@ -738,8 +815,8 @@ class FsDatasetImpl implements FsDataset } @Override // FsDatasetSpi - public synchronized ReplicaInPipeline createRbw(ExtendedBlock b) - throws IOException { + public synchronized ReplicaInPipeline createRbw(StorageType storageType, + ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { @@ -748,7 +825,7 @@ class FsDatasetImpl implements FsDataset " and thus cannot be created."); } // create a new block - FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes()); + FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes()); // create a rbw file to hold block in the designated volume File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), @@ -876,8 +953,8 @@ class FsDatasetImpl implements FsDataset } @Override // FsDatasetSpi - public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b) - throws IOException { + public synchronized ReplicaInPipeline createTemporary(StorageType storageType, + ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + @@ -885,7 +962,7 @@ class FsDatasetImpl implements FsDataset " and thus cannot be created."); } - FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes()); + FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes()); // create a temporary file to hold block in the designated volume File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), @@ -1114,7 +1191,7 @@ class FsDatasetImpl implements FsDataset return f; // if file is not null, but doesn't exist - possibly disk failed - datanode.checkDiskError(); + datanode.checkDiskErrorAsync(); } if (LOG.isDebugEnabled()) { @@ -1187,17 +1264,17 @@ class FsDatasetImpl implements FsDataset + ". Parent not found for file " + f); continue; } - ReplicaState replicaState = info.getState(); - if (replicaState == ReplicaState.FINALIZED || - (replicaState == ReplicaState.RUR && - ((ReplicaUnderRecovery)info).getOriginalReplica().getState() == - ReplicaState.FINALIZED)) { - v.clearPath(bpid, parent); - } volumeMap.remove(bpid, invalidBlks[i]); } + + // If a DFSClient has the replica in its cache of short-circuit file + // descriptors (and the client is using ShortCircuitShm), invalidate it. + datanode.getShortCircuitRegistry().processBlockInvalidation( + new ExtendedBlockId(invalidBlks[i].getBlockId(), bpid)); + // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId()); + // Delete the block asynchronously to make sure we can do it fast enough. // It's ok to unlink the block file before the uncache operation // finishes. @@ -1581,7 +1658,7 @@ class FsDatasetImpl implements FsDataset datanode.getDnConf().getXceiverStopTimeout()); } - /** static version of {@link #initReplicaRecovery(Block, long)}. */ + /** static version of {@link #initReplicaRecovery(RecoveringBlock)}. */ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, Block block, long recoveryId, long xceiverStopTimeout) throws IOException { final ReplicaInfo replica = map.get(bpid, block.getBlockId()); @@ -1909,5 +1986,13 @@ class FsDatasetImpl implements FsDataset } return new RollingLogsImpl(dir, prefix); } + + @Override + public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, + FileDescriptor fd, long offset, long nbytes, int flags) { + FsVolumeImpl fsVolumeImpl = this.getVolume(block); + asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset, + nbytes, flags); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java Tue Aug 19 23:49:39 2014 @@ -50,7 +50,7 @@ public class FsDatasetUtil { } /** Find the corresponding meta data file from a given block file */ - static File findMetaFile(final File blockFile) throws IOException { + public static File findMetaFile(final File blockFile) throws IOException { final String prefix = blockFile.getName() + "_"; final File parent = blockFile.getParentFile(); final File[] matches = parent.listFiles(new FilenameFilter() { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Tue Aug 19 23:49:39 2014 @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.util.DiskChecker.DiskErrorException; @@ -235,10 +236,6 @@ class FsVolumeImpl implements FsVolumeSp // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length()); bp.addToReplicasMap(volumeMap, dir, isFinalized); } - - void clearPath(String bpid, File f) throws IOException { - getBlockPoolSlice(bpid).clearPath(f); - } @Override public String toString() { @@ -274,7 +271,8 @@ class FsVolumeImpl implements FsVolumeSp File finalizedDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_FINALIZED); File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW); - if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) { + if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive( + finalizedDir)) { return false; } if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) { @@ -301,7 +299,8 @@ class FsVolumeImpl implements FsVolumeSp if (!rbwDir.delete()) { throw new IOException("Failed to delete " + rbwDir); } - if (!finalizedDir.delete()) { + if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) || + !FileUtil.fullyDelete(finalizedDir)) { throw new IOException("Failed to delete " + finalizedDir); } FileUtil.fullyDelete(tmpDir); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Tue Aug 19 23:49:39 2014 @@ -18,12 +18,17 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.Time; class FsVolumeList { /** @@ -35,9 +40,8 @@ class FsVolumeList { private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser; private volatile int numFailedVolumes; - FsVolumeList(List<FsVolumeImpl> volumes, int failedVols, + FsVolumeList(int failedVols, VolumeChoosingPolicy<FsVolumeImpl> blockChooser) { - this.volumes = Collections.unmodifiableList(volumes); this.blockChooser = blockChooser; this.numFailedVolumes = failedVols; } @@ -51,11 +55,18 @@ class FsVolumeList { * by a single thread and next volume is chosen with no concurrent * update to {@link #volumes}. * @param blockSize free space needed on the volume + * @param storageType the desired {@link StorageType} * @return next volume to store the block in. */ - // TODO should choose volume with storage type - synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException { - return blockChooser.chooseVolume(volumes, blockSize); + synchronized FsVolumeImpl getNextVolume(StorageType storageType, + long blockSize) throws IOException { + final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size()); + for(FsVolumeImpl v : volumes) { + if (v.getStorageType() == storageType) { + list.add(v); + } + } + return blockChooser.chooseVolume(list, blockSize); } long getDfsUsed() throws IOException { @@ -89,15 +100,9 @@ class FsVolumeList { } return remaining; } - - void initializeReplicaMaps(ReplicaMap globalReplicaMap) throws IOException { - for (FsVolumeImpl v : volumes) { - v.getVolumeMap(globalReplicaMap); - } - } void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException { - long totalStartTime = System.currentTimeMillis(); + long totalStartTime = Time.monotonicNow(); final List<IOException> exceptions = Collections.synchronizedList( new ArrayList<IOException>()); List<Thread> replicaAddingThreads = new ArrayList<Thread>(); @@ -107,9 +112,9 @@ class FsVolumeList { try { FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid + " on volume " + v + "..."); - long startTime = System.currentTimeMillis(); + long startTime = Time.monotonicNow(); v.getVolumeMap(bpid, volumeMap); - long timeTaken = System.currentTimeMillis() - startTime; + long timeTaken = Time.monotonicNow() - startTime; FsDatasetImpl.LOG.info("Time to add replicas to map for block pool" + " " + bpid + " on volume " + v + ": " + timeTaken + "ms"); } catch (IOException ioe) { @@ -132,7 +137,7 @@ class FsVolumeList { if (!exceptions.isEmpty()) { throw exceptions.get(0); } - long totalTimeTaken = System.currentTimeMillis() - totalStartTime; + long totalTimeTaken = Time.monotonicNow() - totalStartTime; FsDatasetImpl.LOG.info("Total time to add all replicas to map: " + totalTimeTaken + "ms"); } @@ -141,9 +146,9 @@ class FsVolumeList { throws IOException { FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid + " on volume " + volume + "..."); - long startTime = System.currentTimeMillis(); + long startTime = Time.monotonicNow(); volume.getVolumeMap(bpid, volumeMap); - long timeTaken = System.currentTimeMillis() - startTime; + long timeTaken = Time.monotonicNow() - startTime; FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid + " on volume " + volume + ": " + timeTaken + "ms"); } @@ -193,9 +198,22 @@ class FsVolumeList { return volumes.toString(); } + /** + * Dynamically add new volumes to the existing volumes that this DN manages. + * @param newVolume the instance of new FsVolumeImpl. + */ + synchronized void addVolume(FsVolumeImpl newVolume) { + // Make a copy of volumes to add new volumes. + final List<FsVolumeImpl> volumeList = volumes == null ? + new ArrayList<FsVolumeImpl>() : + new ArrayList<FsVolumeImpl>(volumes); + volumeList.add(newVolume); + volumes = Collections.unmodifiableList(volumeList); + FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString()); + } void addBlockPool(final String bpid, final Configuration conf) throws IOException { - long totalStartTime = System.currentTimeMillis(); + long totalStartTime = Time.monotonicNow(); final List<IOException> exceptions = Collections.synchronizedList( new ArrayList<IOException>()); @@ -206,9 +224,9 @@ class FsVolumeList { try { FsDatasetImpl.LOG.info("Scanning block pool " + bpid + " on volume " + v + "..."); - long startTime = System.currentTimeMillis(); + long startTime = Time.monotonicNow(); v.addBlockPool(bpid, conf); - long timeTaken = System.currentTimeMillis() - startTime; + long timeTaken = Time.monotonicNow() - startTime; FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid + " on " + v + ": " + timeTaken + "ms"); } catch (IOException ioe) { @@ -232,7 +250,7 @@ class FsVolumeList { throw exceptions.get(0); } - long totalTimeTaken = System.currentTimeMillis() - totalStartTime; + long totalTimeTaken = Time.monotonicNow() - totalStartTime; FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " + bpid + ": " + totalTimeTaken + "ms"); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java Tue Aug 19 23:49:39 2014 @@ -100,7 +100,6 @@ public class MappableBlock implements Cl /** * Verifies the block's checksum. This is an I/O intensive operation. - * @return if the block was successfully checksummed. */ private static void verifyChecksum(long length, FileInputStream metaIn, FileChannel blockChannel, String blockFileName) Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java Tue Aug 19 23:49:39 2014 @@ -188,11 +188,9 @@ class RollingLogsImpl implements Rolling if (reader != null && (line = reader.readLine()) != null) { return; } - if (line == null) { - // move to the next file. - if (openFile()) { - readNext(); - } + // move to the next file. + if (openFile()) { + readNext(); } } finally { if (!hasNext()) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Tue Aug 19 23:49:39 2014 @@ -90,13 +90,15 @@ public class DataNodeMetrics { final MutableQuantiles[] sendDataPacketBlockedOnNetworkNanosQuantiles; @Metric MutableRate sendDataPacketTransferNanos; final MutableQuantiles[] sendDataPacketTransferNanosQuantiles; - final MetricsRegistry registry = new MetricsRegistry("datanode"); final String name; - - public DataNodeMetrics(String name, String sessionId, int[] intervals) { + JvmMetrics jvmMetrics = null; + + public DataNodeMetrics(String name, String sessionId, int[] intervals, + final JvmMetrics jvmMetrics) { this.name = name; + this.jvmMetrics = jvmMetrics; registry.tag(SessionId, sessionId); final int len = intervals.length; @@ -131,7 +133,7 @@ public class DataNodeMetrics { public static DataNodeMetrics create(Configuration conf, String dnName) { String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY); MetricsSystem ms = DefaultMetricsSystem.instance(); - JvmMetrics.create("DataNode", sessionId, ms); + JvmMetrics jm = JvmMetrics.create("DataNode", sessionId, ms); String name = "DataNodeActivity-"+ (dnName.isEmpty() ? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt() : dnName.replace(':', '-')); @@ -141,11 +143,15 @@ public class DataNodeMetrics { conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY); return ms.register(name, null, new DataNodeMetrics(name, sessionId, - intervals)); + intervals, jm)); } public String name() { return name; } + public JvmMetrics getJvmMetrics() { + return jvmMetrics; + } + public void addHeartbeat(long latency) { heartbeats.add(latency); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Tue Aug 19 23:49:39 2014 @@ -74,7 +74,6 @@ import org.apache.hadoop.hdfs.web.resour import org.apache.hadoop.hdfs.web.resources.ReplicationParam; import org.apache.hadoop.hdfs.web.resources.UriFsPathParam; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -127,9 +126,10 @@ public class DatanodeWebHdfsMethods { token.decodeFromUrlString(delegation); URI nnUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" + nnId); - boolean isHA = HAUtil.isLogicalUri(conf, nnUri); - if (isHA) { - token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri)); + boolean isLogical = HAUtil.isLogicalUri(conf, nnUri); + if (isLogical) { + token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri, + HdfsConstants.HDFS_URI_SCHEME)); } else { token.setService(SecurityUtil.buildTokenService(nnUri)); } @@ -452,7 +452,7 @@ public class DatanodeWebHdfsMethods { MD5MD5CRC32FileChecksum checksum = null; DFSClient dfsclient = newDfsClient(nnId, conf); try { - checksum = dfsclient.getFileChecksum(fullpath); + checksum = dfsclient.getFileChecksum(fullpath, Long.MAX_VALUE); dfsclient.close(); dfsclient = null; } finally { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java Tue Aug 19 23:49:39 2014 @@ -27,8 +27,10 @@ import org.apache.hadoop.classification. import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntryScope; import org.apache.hadoop.fs.permission.AclEntryType; +import org.apache.hadoop.fs.permission.AclUtil; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.ScopedAclEntries; import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; @@ -90,7 +92,7 @@ final class AclStorage { FsPermission childPerm = child.getFsPermission(); // Copy each default ACL entry from parent to new child's access ACL. - boolean parentDefaultIsMinimal = isMinimalAcl(parentDefaultEntries); + boolean parentDefaultIsMinimal = AclUtil.isMinimalAcl(parentDefaultEntries); for (AclEntry entry: parentDefaultEntries) { AclEntryType type = entry.getType(); String name = entry.getName(); @@ -127,7 +129,7 @@ final class AclStorage { Collections.<AclEntry>emptyList(); final FsPermission newPerm; - if (!isMinimalAcl(accessEntries) || !defaultEntries.isEmpty()) { + if (!AclUtil.isMinimalAcl(accessEntries) || !defaultEntries.isEmpty()) { // Save the new ACL to the child. child.addAclFeature(createAclFeature(accessEntries, defaultEntries)); newPerm = createFsPermissionForExtendedAcl(accessEntries, childPerm); @@ -172,7 +174,7 @@ final class AclStorage { FsPermission perm = inode.getFsPermission(); AclFeature f = inode.getAclFeature(); if (f == null) { - return getMinimalAcl(perm); + return AclUtil.getMinimalAcl(perm); } final List<AclEntry> existingAcl; @@ -208,7 +210,7 @@ final class AclStorage { } else { // It's possible that there is a default ACL but no access ACL. In this // case, add the minimal access ACL implied by the permission bits. - existingAcl.addAll(getMinimalAcl(perm)); + existingAcl.addAll(AclUtil.getMinimalAcl(perm)); } // Add all default entries after the access entries. @@ -267,7 +269,7 @@ final class AclStorage { assert newAcl.size() >= 3; FsPermission perm = inode.getFsPermission(); final FsPermission newPerm; - if (!isMinimalAcl(newAcl)) { + if (!AclUtil.isMinimalAcl(newAcl)) { // This is an extended ACL. Split entries into access vs. default. ScopedAclEntries scoped = new ScopedAclEntries(newAcl); List<AclEntry> accessEntries = scoped.getAccessEntries(); @@ -321,7 +323,7 @@ final class AclStorage { // For the access ACL, the feature only needs to hold the named user and // group entries. For a correctly sorted ACL, these will be in a // predictable range. - if (!isMinimalAcl(accessEntries)) { + if (!AclUtil.isMinimalAcl(accessEntries)) { featureEntries.addAll( accessEntries.subList(1, accessEntries.size() - 2)); } @@ -336,6 +338,10 @@ final class AclStorage { * ACL, based on its access ACL entries. For a correctly sorted ACL, the * first entry is the owner and the last 2 entries are the mask and other * entries respectively. Also preserve sticky bit and toggle ACL bit on. + * Note that this method intentionally copies the permissions of the mask + * entry into the FsPermission group permissions. This is consistent with the + * POSIX ACLs model, which presents the mask as the permissions of the group + * class. * * @param accessEntries List<AclEntry> access ACL entries * @param existingPerm FsPermission existing permissions @@ -366,41 +372,4 @@ final class AclStorage { accessEntries.get(2).getPermission(), existingPerm.getStickyBit()); } - - /** - * Translates the given permission bits to the equivalent minimal ACL. - * - * @param perm FsPermission to translate - * @return List<AclEntry> containing exactly 3 entries representing the owner, - * group and other permissions - */ - private static List<AclEntry> getMinimalAcl(FsPermission perm) { - return Lists.newArrayList( - new AclEntry.Builder() - .setScope(AclEntryScope.ACCESS) - .setType(AclEntryType.USER) - .setPermission(perm.getUserAction()) - .build(), - new AclEntry.Builder() - .setScope(AclEntryScope.ACCESS) - .setType(AclEntryType.GROUP) - .setPermission(perm.getGroupAction()) - .build(), - new AclEntry.Builder() - .setScope(AclEntryScope.ACCESS) - .setType(AclEntryType.OTHER) - .setPermission(perm.getOtherAction()) - .build()); - } - - /** - * Checks if the given entries represent a minimal ACL (contains exactly 3 - * entries). - * - * @param entries List<AclEntry> entries to check - * @return boolean true if the entries represent a minimal ACL - */ - private static boolean isMinimalAcl(List<AclEntry> entries) { - return entries.size() == 3; - } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclTransformation.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclTransformation.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclTransformation.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclTransformation.java Tue Aug 19 23:49:39 2014 @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.permission.A import org.apache.hadoop.fs.permission.AclEntryType; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.ScopedAclEntries; import org.apache.hadoop.hdfs.protocol.AclException; /** Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Tue Aug 19 23:49:39 2014 @@ -355,7 +355,7 @@ public class BackupNode extends NameNode /** * Register this backup node with the active name-node. - * @param nsInfo + * @param nsInfo namespace information * @throws IOException */ private void registerWith(NamespaceInfo nsInfo) throws IOException { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Tue Aug 19 23:49:39 2014 @@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT; import java.io.DataInput; +import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -42,8 +43,6 @@ import java.util.TreeMap; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; @@ -61,10 +60,10 @@ import org.apache.hadoop.hdfs.protocol.C import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; @@ -84,6 +83,8 @@ import org.apache.hadoop.security.Access import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -98,7 +99,7 @@ import com.google.common.collect.Lists; */ @InterfaceAudience.LimitedPrivate({"HDFS"}) public final class CacheManager { - public static final Log LOG = LogFactory.getLog(CacheManager.class); + public static final Logger LOG = LoggerFactory.getLogger(CacheManager.class); private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f; @@ -204,8 +205,8 @@ public final class CacheManager { DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT, DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT); if (cachedBlocksPercent < MIN_CACHED_BLOCKS_PERCENT) { - LOG.info("Using minimum value " + MIN_CACHED_BLOCKS_PERCENT + - " for " + DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT); + LOG.info("Using minimum value {} for {}", MIN_CACHED_BLOCKS_PERCENT, + DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT); cachedBlocksPercent = MIN_CACHED_BLOCKS_PERCENT; } this.cachedBlocks = new LightWeightGSet<CachedBlock, CachedBlock>( @@ -216,7 +217,7 @@ public final class CacheManager { /** * Resets all tracked directives and pools. Called during 2NN checkpointing to - * reset FSNamesystem state. See {FSNamesystem{@link #clear()}. + * reset FSNamesystem state. See {@link FSNamesystem#clear()}. */ void clear() { directivesById.clear(); @@ -345,10 +346,8 @@ public final class CacheManager { */ private static long validateExpiryTime(CacheDirectiveInfo info, long maxRelativeExpiryTime) throws InvalidRequestException { - if (LOG.isTraceEnabled()) { - LOG.trace("Validating directive " + info - + " pool maxRelativeExpiryTime " + maxRelativeExpiryTime); - } + LOG.trace("Validating directive {} pool maxRelativeExpiryTime {}", info, + maxRelativeExpiryTime); final long now = new Date().getTime(); final long maxAbsoluteExpiryTime = now + maxRelativeExpiryTime; if (info == null || info.getExpiration() == null) { @@ -538,7 +537,7 @@ public final class CacheManager { LOG.warn("addDirective of " + info + " failed: ", e); throw e; } - LOG.info("addDirective of " + info + " successful."); + LOG.info("addDirective of {} successful.", info); return directive.toInfo(); } @@ -640,8 +639,7 @@ public final class CacheManager { LOG.warn("modifyDirective of " + idString + " failed: ", e); throw e; } - LOG.info("modifyDirective of " + idString + " successfully applied " + - info+ "."); + LOG.info("modifyDirective of {} successfully applied {}.", idString, info); } private void removeInternal(CacheDirective directive) @@ -690,15 +688,25 @@ public final class CacheManager { assert namesystem.hasReadLock(); final int NUM_PRE_ALLOCATED_ENTRIES = 16; String filterPath = null; - if (filter.getId() != null) { - throw new IOException("Filtering by ID is unsupported."); - } if (filter.getPath() != null) { filterPath = validatePath(filter); } if (filter.getReplication() != null) { - throw new IOException("Filtering by replication is unsupported."); + throw new InvalidRequestException( + "Filtering by replication is unsupported."); + } + + // Querying for a single ID + final Long id = filter.getId(); + if (id != null) { + if (!directivesById.containsKey(id)) { + throw new InvalidRequestException("Did not find requested id " + id); + } + // Since we use a tailMap on directivesById, setting prev to id-1 gets + // us the directive with the id (if present) + prevId = id - 1; } + ArrayList<CacheDirectiveEntry> replies = new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES); int numReplies = 0; @@ -710,6 +718,14 @@ public final class CacheManager { } CacheDirective curDirective = cur.getValue(); CacheDirectiveInfo info = cur.getValue().toInfo(); + + // If the requested ID is present, it should be the first item. + // Hitting this case means the ID is not present, or we're on the second + // item and should break out. + if (id != null && + !(info.getId().equals(id))) { + break; + } if (filter.getPool() != null && !info.getPool().equals(filter.getPool())) { continue; @@ -760,7 +776,7 @@ public final class CacheManager { LOG.info("addCachePool of " + info + " failed: ", e); throw e; } - LOG.info("addCachePool of " + info + " successful."); + LOG.info("addCachePool of {} successful.", info); return pool.getInfo(true); } @@ -823,8 +839,8 @@ public final class CacheManager { LOG.info("modifyCachePool of " + info + " failed: ", e); throw e; } - LOG.info("modifyCachePool of " + info.getPoolName() + " successful; " - + bld.toString()); + LOG.info("modifyCachePool of {} successful; {}", info.getPoolName(), + bld.toString()); } /** @@ -916,11 +932,9 @@ public final class CacheManager { if (metrics != null) { metrics.addCacheBlockReport((int) (endTime - startTime)); } - if (LOG.isDebugEnabled()) { - LOG.debug("Processed cache report from " - + datanodeID + ", blocks: " + blockIds.size() - + ", processing time: " + (endTime - startTime) + " msecs"); - } + LOG.debug("Processed cache report from {}, blocks: {}, " + + "processing time: {} msecs", datanodeID, blockIds.size(), + (endTime - startTime)); } private void processCacheReportImpl(final DatanodeDescriptor datanode, @@ -931,6 +945,8 @@ public final class CacheManager { CachedBlocksList pendingCachedList = datanode.getPendingCached(); for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) { long blockId = iter.next(); + LOG.trace("Cache report from datanode {} has block {}", datanode, + blockId); CachedBlock cachedBlock = new CachedBlock(blockId, (short)0, false); CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock); @@ -940,19 +956,34 @@ public final class CacheManager { cachedBlock = prevCachedBlock; } else { cachedBlocks.put(cachedBlock); + LOG.trace("Added block {} to cachedBlocks", cachedBlock); } // Add the block to the datanode's implicit cached block list // if it's not already there. Similarly, remove it from the pending // cached block list if it exists there. if (!cachedBlock.isPresent(cachedList)) { cachedList.add(cachedBlock); + LOG.trace("Added block {} to CACHED list.", cachedBlock); } if (cachedBlock.isPresent(pendingCachedList)) { pendingCachedList.remove(cachedBlock); + LOG.trace("Removed block {} from PENDING_CACHED list.", cachedBlock); } } } + /** + * Saves the current state of the CacheManager to the DataOutput. Used + * to persist CacheManager state in the FSImage. + * @param out DataOutput to persist state + * @param sdPath path of the storage directory + * @throws IOException + */ + public void saveStateCompat(DataOutputStream out, String sdPath) + throws IOException { + serializerCompat.save(out, sdPath); + } + public PersistState saveState() throws IOException { ArrayList<CachePoolInfoProto> pools = Lists .newArrayListWithCapacity(cachePools.size()); @@ -1072,6 +1103,12 @@ public final class CacheManager { } private final class SerializerCompat { + private void save(DataOutputStream out, String sdPath) throws IOException { + out.writeLong(nextDirectiveId); + savePools(out, sdPath); + saveDirectives(out, sdPath); + } + private void load(DataInput in) throws IOException { nextDirectiveId = in.readLong(); // pools need to be loaded first since directives point to their parent pool @@ -1080,6 +1117,42 @@ public final class CacheManager { } /** + * Save cache pools to fsimage + */ + private void savePools(DataOutputStream out, + String sdPath) throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.CACHE_POOLS, sdPath); + prog.beginStep(Phase.SAVING_CHECKPOINT, step); + prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size()); + Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); + out.writeInt(cachePools.size()); + for (CachePool pool: cachePools.values()) { + FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true)); + counter.increment(); + } + prog.endStep(Phase.SAVING_CHECKPOINT, step); + } + + /* + * Save cache entries to fsimage + */ + private void saveDirectives(DataOutputStream out, String sdPath) + throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.CACHE_ENTRIES, sdPath); + prog.beginStep(Phase.SAVING_CHECKPOINT, step); + prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size()); + Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); + out.writeInt(directivesById.size()); + for (CacheDirective directive : directivesById.values()) { + FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo()); + counter.increment(); + } + prog.endStep(Phase.SAVING_CHECKPOINT, step); + } + + /** * Load cache pools from fsimage */ private void loadPools(DataInput in) Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java Tue Aug 19 23:49:39 2014 @@ -109,9 +109,7 @@ public final class CachePool { UserGroupInformation ugi = null; String ownerName = info.getOwnerName(); if (ownerName == null) { - if (ugi == null) { - ugi = NameNode.getRemoteUser(); - } + ugi = NameNode.getRemoteUser(); ownerName = ugi.getShortUserName(); } String groupName = info.getGroupName(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachedBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachedBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachedBlock.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachedBlock.java Tue Aug 19 23:49:39 2014 @@ -125,7 +125,7 @@ public final class CachedBlock implement * @param type If null, this parameter is ignored. * If it is non-null, we match only datanodes which * have it on this list. - * See {@link DatanodeDescriptor#CachedBlocksList#Type} + * See {@link DatanodeDescriptor.CachedBlocksList.Type} * for a description of all the lists. * * @return The list of datanodes. Modifying this list does not Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java Tue Aug 19 23:49:39 2014 @@ -41,6 +41,9 @@ public class CheckpointConf { /** maxium number of retries when merge errors occur */ private final int maxRetriesOnMergeError; + + /** The output dir for legacy OIV image */ + private final String legacyOivImageDir; public CheckpointConf(Configuration conf) { checkpointCheckPeriod = conf.getLong( @@ -53,6 +56,7 @@ public class CheckpointConf { DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT); maxRetriesOnMergeError = conf.getInt(DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY, DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT); + legacyOivImageDir = conf.get(DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY); warnForDeprecatedConfigs(conf); } @@ -83,4 +87,8 @@ public class CheckpointConf { public int getMaxRetriesOnMergeError() { return maxRetriesOnMergeError; } + + public String getLegacyOivImageDir() { + return legacyOivImageDir; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Tue Aug 19 23:49:39 2014 @@ -252,7 +252,7 @@ class Checkpointer extends Daemon { backupNode.namesystem.writeLock(); try { - backupNode.namesystem.dir.setReady(); + backupNode.namesystem.setImageLoaded(); if(backupNode.namesystem.getBlocksTotal() > 0) { backupNode.namesystem.setBlockTotal(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Tue Aug 19 23:49:39 2014 @@ -93,10 +93,6 @@ public class EditLogFileInputStream exte * @param name filename to open * @param firstTxId first transaction found in file * @param lastTxId last transaction id found in file - * @throws LogHeaderCorruptException if the header is either missing or - * appears to be corrupt/truncated - * @throws IOException if an actual IO error occurs while reading the - * header */ public EditLogFileInputStream(File name, long firstTxId, long lastTxId, boolean isInProgress) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Tue Aug 19 23:49:39 2014 @@ -127,14 +127,14 @@ public abstract class EditLogOutputStrea } /** - * Return total time spent in {@link #flushAndSync()} + * Return total time spent in {@link #flushAndSync(boolean)} */ long getTotalSyncTime() { return totalTimeSync; } /** - * Return number of calls to {@link #flushAndSync()} + * Return number of calls to {@link #flushAndSync(boolean)} */ protected long getNumSync() { return numSync; Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Tue Aug 19 23:49:39 2014 @@ -48,6 +48,15 @@ public interface FSClusterStats { * @return Number of datanodes that are both alive and not decommissioned. */ public int getNumDatanodesInService(); + + /** + * an indication of the average load of non-decommission(ing|ed) nodes + * eligible for block placement + * + * @return average of the in service number of block transfers and block + * writes that are currently occurring on the cluster. + */ + public double getInServiceXceiverAverage(); }