Author: szetszwo
Date: Sat Mar 10 02:06:00 2012
New Revision: 1299146
URL: http://svn.apache.org/viewvc?rev=1299146&view=rev
Log:
Merge r1299139 and r1299144 from trunk for HDFS-3056.
Added:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RollingLogs.java
- copied unchanged from r1299144,
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RollingLogs.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/
(props changed)
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
(props changed)
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Propchange: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Mar 10 02:06:00 2012
@@ -1,5 +1,5 @@
/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs:1227776-1294021
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:1161777,1161781,1161992,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163081,1163490,1163768,1164255,1164301,1164339,1165826,1166402,1166466,1167383,1167662,1170085,1170379,1170459,1170996,1171136,1171297,1171379,1171611,1171711,1172916,1173402,1173468,1173488,1175113,1176178,1176550,1176719,1176729,1176733,1177100,1177161,1177487,1177531,1177757,1177859,1177864,1177905,1179169,1179856,1179861,1180757,1182189,1182205,1182214,1183081,1183098,1183175,1183554,1186508,1186896,1187140,1187505,1188282,1188286,1188300,1188436,1188487,1189028,1189355,1189360,1189546,1189613,1189901,1189932,1189982,1190077,1190127,1190620,1190708,1195575,1195656,1195731,1195754,1196113,1196129,1196171,1197329,1198903,1199396,1200731,1204114,1204117,1204122,1204124,1204129,1204131,1204177,1204366,1204370,1204376,1204388,1204544,1204707,1204709,1204825,1205146,1205260,1205626,1205697,1206178,1206786,1206830,1207585,
1207694,1208140,1208153,1208313,1210208,1210657,1210719,1210746,1211206,1211249,1211769,1212021,1212062,1212073,1212084,1212299,1212606,1213040,1213143,1213537,1213586,1213592-1213593,1213808,1213813,1213954,1213985,1214027,1214033,1214046,1214102-1214103,1214128,1215364,1215366,1220315,1220510,1221106,1221348,1225114,1225192,1225456,1225489,1225591,1226211,1226239,1226350,1227091,1227165,1227423,1227887,1227964,1229347,1229877,1229897,1230398,1231569,1231572,1231627,1231640,1233584,1233605,1234555,1235135,1235137,1235956,1236456,1238700,1238779,1238969,1239752,1240020,1240653,1240897,1240928,1241007,1241519,1242087,1242891,1243065,1243104,1243654,1243690,1244766,1245751,1245762,1292419,1292626,1293419,1293487,1295061,1295227,1295929,1297328,1298044,1298066,1298696,1298700,1299045
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:1161777,1161781,1161992,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163081,1163490,1163768,1164255,1164301,1164339,1165826,1166402,1166466,1167383,1167662,1170085,1170379,1170459,1170996,1171136,1171297,1171379,1171611,1171711,1172916,1173402,1173468,1173488,1175113,1176178,1176550,1176719,1176729,1176733,1177100,1177161,1177487,1177531,1177757,1177859,1177864,1177905,1179169,1179856,1179861,1180757,1182189,1182205,1182214,1183081,1183098,1183175,1183554,1186508,1186896,1187140,1187505,1188282,1188286,1188300,1188436,1188487,1189028,1189355,1189360,1189546,1189613,1189901,1189932,1189982,1190077,1190127,1190620,1190708,1195575,1195656,1195731,1195754,1196113,1196129,1196171,1197329,1198903,1199396,1200731,1204114,1204117,1204122,1204124,1204129,1204131,1204177,1204366,1204370,1204376,1204388,1204544,1204707,1204709,1204825,1205146,1205260,1205626,1205697,1206178,1206786,1206830,1207585,
1207694,1208140,1208153,1208313,1210208,1210657,1210719,1210746,1211206,1211249,1211769,1212021,1212062,1212073,1212084,1212299,1212606,1213040,1213143,1213537,1213586,1213592-1213593,1213808,1213813,1213954,1213985,1214027,1214033,1214046,1214102-1214103,1214128,1215364,1215366,1220315,1220510,1221106,1221348,1225114,1225192,1225456,1225489,1225591,1226211,1226239,1226350,1227091,1227165,1227423,1227887,1227964,1229347,1229877,1229897,1230398,1231569,1231572,1231627,1231640,1233584,1233605,1234555,1235135,1235137,1235956,1236456,1238700,1238779,1238969,1239752,1240020,1240653,1240897,1240928,1241007,1241519,1242087,1242891,1243065,1243104,1243654,1243690,1244766,1245751,1245762,1292419,1292626,1293419,1293487,1295061,1295227,1295929,1297328,1298044,1298066,1298696,1298700,1299045,1299139,1299144
/hadoop/core/branches/branch-0.19/hdfs:713112
/hadoop/hdfs/branches/HDFS-1052:987665-1095512
/hadoop/hdfs/branches/HDFS-265:796829-820463
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1299146&r1=1299145&r2=1299146&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Sat Mar 10 02:06:00 2012
@@ -45,8 +45,6 @@ Release 0.23.3 - UNRELEASED
HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple
storages. (suresh)
- HDFS-3021. Use generic type to declare FSDatasetInterface. (szetszwo)
-
HDFS-2430. The number of failed or low-resource volumes the NN can tolerate
should be configurable. (atm)
@@ -122,6 +120,11 @@ Release 0.23.3 - UNRELEASED
HDFS-3014. FSEditLogOp and its subclasses should have toString() method.
(Sho Shimauchi via atm)
+ HDFS-3021. Use generic type to declare FSDatasetInterface. (szetszwo)
+
+ HDFS-3056. Add a new interface RollingLogs for DataBlockScanner logging.
+ (szetszwo)
+
OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the
namenode state. (Tomasz Nykiel via hairong)
Propchange:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Mar 10 02:06:00 2012
@@ -1,5 +1,5 @@
/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java:1227776-1294021
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:1161777,1161781,1161992,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163081,1163490,1163768,1164255,1164301,1164339,1165826,1166402,1166466,1167383,1167662,1170085,1170379,1170459,1170996,1171136,1171297,1171379,1171611,1171711,1172916,1173402,1173468,1173488,1175113,1176178,1176550,1176719,1176729,1176733,1177100,1177161,1177487,1177531,1177757,1177859,1177864,1177905,1179169,1179856,1179861,1180757,1182189,1182205,1182214,1183081,1183098,1183175,1183554,1186508,1186896,1187140,1187505,1188282,1188286,1188300,1188436,1188487,1189028,1189355,1189360,1189546,1189613,1189901,1189932,1189982,1190077,1190127,1190620,1190708,1195575,1195656,1195731,1195754,1196113,1196129,1196171,1197329,1198903,1199396,1200731,1204114,1204117,1204122,1204124,1204129,1204131,1204177,1204366,1204370,1204376,1204388,1204544,1204707,1205146,1205260,1205697,1206786,1206830,1207694,1208140,1208153,12
08313,1210208,1210657,1210719,1210746,1211206,1211249,1211769,1212021,1212062,1212073,1212084,1212299,1212606,1213040,1213143,1213537,1213586,1213592-1213593,1213808,1213813,1213954,1213985,1214027,1214033,1214046,1214102-1214103,1214128,1215364,1215366,1220315,1220510,1221106,1221348,1225114,1225192,1225456,1225489,1225591,1226211,1226239,1226350,1227091,1227165,1227423,1227887,1227964,1229347,1229877,1229897,1230398,1231569,1231572,1231627,1231640,1233584,1233605,1234555,1235135,1235137,1235956,1236456,1238700,1238779,1238969,1239752,1240020,1240653,1240897,1240928,1241007,1241519,1242087,1242891,1243065,1243104,1243654,1244766,1245751,1245762,1292419,1293419,1293487,1295061,1295227,1295929,1297328,1298044,1298696,1298700,1299045
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:1161777,1161781,1161992,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163081,1163490,1163768,1164255,1164301,1164339,1165826,1166402,1166466,1167383,1167662,1170085,1170379,1170459,1170996,1171136,1171297,1171379,1171611,1171711,1172916,1173402,1173468,1173488,1175113,1176178,1176550,1176719,1176729,1176733,1177100,1177161,1177487,1177531,1177757,1177859,1177864,1177905,1179169,1179856,1179861,1180757,1182189,1182205,1182214,1183081,1183098,1183175,1183554,1186508,1186896,1187140,1187505,1188282,1188286,1188300,1188436,1188487,1189028,1189355,1189360,1189546,1189613,1189901,1189932,1189982,1190077,1190127,1190620,1190708,1195575,1195656,1195731,1195754,1196113,1196129,1196171,1197329,1198903,1199396,1200731,1204114,1204117,1204122,1204124,1204129,1204131,1204177,1204366,1204370,1204376,1204388,1204544,1204707,1205146,1205260,1205697,1206786,1206830,1207694,1208140,1208153,12
08313,1210208,1210657,1210719,1210746,1211206,1211249,1211769,1212021,1212062,1212073,1212084,1212299,1212606,1213040,1213143,1213537,1213586,1213592-1213593,1213808,1213813,1213954,1213985,1214027,1214033,1214046,1214102-1214103,1214128,1215364,1215366,1220315,1220510,1221106,1221348,1225114,1225192,1225456,1225489,1225591,1226211,1226239,1226350,1227091,1227165,1227423,1227887,1227964,1229347,1229877,1229897,1230398,1231569,1231572,1231627,1231640,1233584,1233605,1234555,1235135,1235137,1235956,1236456,1238700,1238779,1238969,1239752,1240020,1240653,1240897,1240928,1241007,1241519,1242087,1242891,1243065,1243104,1243654,1244766,1245751,1245762,1292419,1293419,1293487,1295061,1295227,1295929,1297328,1298044,1298696,1298700,1299045,1299139,1299144
/hadoop/core/branches/branch-0.19/hdfs/src/java:713112
/hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
/hadoop/hdfs/branches/HDFS-1052/src/java:987665-1095512
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1299146&r1=1299145&r2=1299146&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
Sat Mar 10 02:06:00 2012
@@ -18,15 +18,9 @@
package org.apache.hadoop.hdfs.server.datanode;
-import java.io.BufferedReader;
-import java.io.Closeable;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
import java.io.IOException;
-import java.io.PrintStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collections;
@@ -34,7 +28,11 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -61,41 +59,43 @@ class BlockPoolSliceScanner {
public static final Log LOG = LogFactory.getLog(BlockPoolSliceScanner.class);
+ private static final String DATA_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS";
+
private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec
private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec
-
- static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
+ private static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
+
+ private static final String VERIFICATION_PREFIX =
"dncp_block_verification.log";
+
private final String blockPoolId;
-
- private static final String dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS";
-
- static final String verificationLogFile = "dncp_block_verification.log";
- static final int verficationLogLimit = 5; // * numBlocks.
+ private final long scanPeriod;
+ private final AtomicLong lastScanTime = new AtomicLong();
- private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000;
- private DataNode datanode;
+ private final DataNode datanode;
private final FSDatasetInterface<? extends FSVolumeInterface> dataset;
- // sorted set
- private TreeSet<BlockScanInfo> blockInfoSet;
- private HashMap<Block, BlockScanInfo> blockMap;
+ private final SortedSet<BlockScanInfo> blockInfoSet
+ = new TreeSet<BlockScanInfo>();
+ private final Map<Block, BlockScanInfo> blockMap
+ = new HashMap<Block, BlockScanInfo>();
// processedBlocks keeps track of which blocks are scanned
// since the last run.
- private HashMap<Long, Integer> processedBlocks;
+ private volatile HashMap<Long, Integer> processedBlocks;
private long totalScans = 0;
private long totalScanErrors = 0;
private long totalTransientErrors = 0;
- private long totalBlocksScannedInLastRun = 0; // Used for test only
+ private final AtomicInteger totalBlocksScannedInLastRun = new
AtomicInteger(); // Used for test only
private long currentPeriodStart = System.currentTimeMillis();
private long bytesLeft = 0; // Bytes to scan in this period
private long totalBytesToScan = 0;
- private LogFileHandler verificationLog;
+ private final LogFileHandler verificationLog;
- private DataTransferThrottler throttler = null;
+ private final DataTransferThrottler throttler = new DataTransferThrottler(
+ 200, MAX_SCAN_RATE);
private static enum ScanType {
VERIFICATION_SCAN, // scanned as part of periodic verfication
@@ -133,29 +133,48 @@ class BlockPoolSliceScanner {
}
}
- BlockPoolSliceScanner(DataNode datanode,
+ BlockPoolSliceScanner(String bpid, DataNode datanode,
FSDatasetInterface<? extends FSVolumeInterface> dataset,
- Configuration conf, String bpid) {
+ Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;
this.blockPoolId = bpid;
- scanPeriod = conf.getInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
+
+ long hours = conf.getInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT);
- if ( scanPeriod <= 0 ) {
- scanPeriod = DEFAULT_SCAN_PERIOD_HOURS;
+ if (hours <= 0) {
+ hours = DEFAULT_SCAN_PERIOD_HOURS;
}
- scanPeriod *= 3600 * 1000;
- LOG.info("Periodic Block Verification scan initialized with interval " +
scanPeriod + ".");
+ this.scanPeriod = hours * 3600 * 1000;
+ LOG.info("Periodic Block Verification Scanner initialized with interval "
+ + hours + " hours for block pool " + bpid + ".");
+
+ // get the list of blocks and arrange them in random order
+ List<Block> arr = dataset.getFinalizedBlocks(blockPoolId);
+ Collections.shuffle(arr);
+
+ long scanTime = -1;
+ for (Block block : arr) {
+ BlockScanInfo info = new BlockScanInfo( block );
+ info.lastScanTime = scanTime--;
+ //still keep 'info.lastScanType' to NONE.
+ addBlockInfo(info);
+ }
+
+ RollingLogs rollingLogs = null;
+ try {
+ rollingLogs = dataset.createRollingLogs(blockPoolId,
VERIFICATION_PREFIX);
+ } catch (IOException e) {
+ LOG.warn("Could not open verfication log. " +
+ "Verification times are not stored.");
+ }
+ verificationLog = rollingLogs == null? null: new
LogFileHandler(rollingLogs);
}
String getBlockPoolId() {
return blockPoolId;
}
- synchronized boolean isInitialized() {
- return throttler != null;
- }
-
private void updateBytesToScan(long len, long lastScanTime) {
// len could be negative when a block is deleted.
totalBytesToScan += len;
@@ -197,51 +216,6 @@ class BlockPoolSliceScanner {
}
}
- void init() throws IOException {
- // get the list of blocks and arrange them in random order
- List<Block> arr = dataset.getFinalizedBlocks(blockPoolId);
- Collections.shuffle(arr);
-
- blockInfoSet = new TreeSet<BlockScanInfo>();
- blockMap = new HashMap<Block, BlockScanInfo>();
-
- long scanTime = -1;
- for (Block block : arr) {
- BlockScanInfo info = new BlockScanInfo( block );
- info.lastScanTime = scanTime--;
- //still keep 'info.lastScanType' to NONE.
- addBlockInfo(info);
- }
-
- /* Pick the first directory that has any existing scanner log.
- * otherwise, pick the first directory.
- */
- File dir = null;
- final List<? extends FSVolumeInterface> volumes = dataset.getVolumes();
- for (FSVolumeInterface vol : volumes) {
- File bpDir = vol.getDirectory(blockPoolId);
- if (LogFileHandler.isFilePresent(bpDir, verificationLogFile)) {
- dir = bpDir;
- break;
- }
- }
- if (dir == null) {
- dir = volumes.get(0).getDirectory(blockPoolId);
- }
-
- try {
- // max lines will be updated later during initialization.
- verificationLog = new LogFileHandler(dir, verificationLogFile, 100);
- } catch (IOException e) {
- LOG.warn("Could not open verfication log. " +
- "Verification times are not stored.");
- }
-
- synchronized (this) {
- throttler = new DataTransferThrottler(200, MAX_SCAN_RATE);
- }
- }
-
private synchronized long getNewBlockScanTime() {
/* If there are a lot of blocks, this returns a random time with in
* the scan period. Otherwise something sooner.
@@ -255,10 +229,6 @@ class BlockPoolSliceScanner {
/** Adds block to list of blocks */
synchronized void addBlock(ExtendedBlock block) {
- if (!isInitialized()) {
- return;
- }
-
BlockScanInfo info = blockMap.get(block.getLocalBlock());
if ( info != null ) {
LOG.warn("Adding an already existing block " + block);
@@ -274,20 +244,19 @@ class BlockPoolSliceScanner {
/** Deletes the block from internal structures */
synchronized void deleteBlock(Block block) {
- if (!isInitialized()) {
- return;
- }
BlockScanInfo info = blockMap.get(block);
if ( info != null ) {
delBlockInfo(info);
}
}
- /** @return the last scan time */
+ /** @return the last scan time for the block pool. */
+ long getLastScanTime() {
+ return lastScanTime.get();
+ }
+
+ /** @return the last scan time the given block. */
synchronized long getLastScanTime(Block block) {
- if (!isInitialized()) {
- return 0;
- }
BlockScanInfo info = blockMap.get(block);
return info == null? 0: info.lastScanTime;
}
@@ -302,9 +271,6 @@ class BlockPoolSliceScanner {
private synchronized void updateScanStatus(Block block,
ScanType type,
boolean scanOk) {
- if (!isInitialized()) {
- return;
- }
BlockScanInfo info = blockMap.get(block);
if ( info != null ) {
@@ -325,9 +291,9 @@ class BlockPoolSliceScanner {
return;
}
- LogFileHandler log = verificationLog;
- if (log != null) {
- log.appendLine(now, block.getGenerationStamp(), block.getBlockId());
+ if (verificationLog != null) {
+ verificationLog.append(now, block.getGenerationStamp(),
+ block.getBlockId());
}
}
@@ -342,6 +308,7 @@ class BlockPoolSliceScanner {
}
static private class LogEntry {
+
long blockId = -1;
long verificationTime = -1;
long genStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
@@ -355,6 +322,14 @@ class BlockPoolSliceScanner {
private static Pattern entryPattern =
Pattern.compile("\\G\\s*([^=\\p{Space}]+)=\"(.*?)\"\\s*");
+ static String toString(long verificationTime, long genStamp, long blockId,
+ DateFormat dateFormat) {
+ return "\ndate=\"" + dateFormat.format(new Date(verificationTime))
+ + "\"\t time=\"" + verificationTime
+ + "\"\t genstamp=\"" + genStamp
+ + "\"\t id=\"" + blockId + "\"";
+ }
+
static LogEntry parseEntry(String line) {
LogEntry entry = new LogEntry();
@@ -491,8 +466,8 @@ class BlockPoolSliceScanner {
}
// Used for tests only
- long getBlocksScannedInLastRun() {
- return totalBlocksScannedInLastRun;
+ int getBlocksScannedInLastRun() {
+ return totalBlocksScannedInLastRun.get();
}
/**
@@ -503,33 +478,19 @@ class BlockPoolSliceScanner {
* to exit.
*/
private boolean assignInitialVerificationTimes() {
- int numBlocks = 1;
- LogFileHandler log = null;
- synchronized (this) {
- log = verificationLog;
- numBlocks = Math.max(blockMap.size(), 1);
- }
-
- long now = System.currentTimeMillis();
- LogFileHandler.Reader logReader[] = new LogFileHandler.Reader[2];
- try {
- if (log != null) {
- logReader[0] = log.getCurrentFileReader();
- logReader[1] = log.getPreviousFileReader();
- }
- } catch (IOException e) {
- LOG.warn("Could not read previous verification times", e);
- }
-
- try {
- for (LogFileHandler.Reader reader : logReader) {
- // update verification times from the verificationLog.
- while (logReader != null && reader.hasNext()) {
+ //First updates the last verification times from the log file.
+ if (verificationLog != null) {
+ long now = System.currentTimeMillis();
+ RollingLogs.LineIterator logIterator = null;
+ try {
+ logIterator = verificationLog.logs.iterator(false);
+ // update verification times from the verificationLog.
+ while (logIterator.hasNext()) {
if (!datanode.shouldRun
|| datanode.blockScanner.blockScannerThread.isInterrupted()) {
return false;
}
- LogEntry entry = LogEntry.parseEntry(reader.next());
+ LogEntry entry = LogEntry.parseEntry(logIterator.next());
if (entry != null) {
updateBlockInfo(entry);
if (now - entry.verificationTime < scanPeriod) {
@@ -540,35 +501,35 @@ class BlockPoolSliceScanner {
updateBytesLeft(-info.block.getNumBytes());
processedBlocks.put(entry.blockId, 1);
}
- if (reader.file == log.prevFile) {
+ if (logIterator.isPrevious()) {
// write the log entry to current file
// so that the entry is preserved for later runs.
- log.appendLine(entry.verificationTime, entry.genStamp,
+ verificationLog.append(entry.verificationTime,
entry.genStamp,
entry.blockId);
}
}
}
}
}
+ } catch (IOException e) {
+ LOG.warn("Failed to read previous verification times.", e);
+ } finally {
+ IOUtils.closeStream(logIterator);
}
- } finally {
- IOUtils.closeStream(logReader[0]);
- IOUtils.closeStream(logReader[1]);
}
- /* Initially spread the block reads over half of
- * MIN_SCAN_PERIOD so that we don't keep scanning the
- * blocks too quickly when restarted.
- */
- long verifyInterval = (long) (Math.min( scanPeriod/2.0/numBlocks,
- 10*60*1000 ));
- long lastScanTime = System.currentTimeMillis() - scanPeriod;
/* Before this loop, entries in blockInfoSet that are not
* updated above have lastScanTime of <= 0 . Loop until first entry has
* lastModificationTime > 0.
*/
synchronized (this) {
+ final int numBlocks = Math.max(blockMap.size(), 1);
+ // Initially spread the block reads over half of scan period
+ // so that we don't keep scanning the blocks too quickly when restarted.
+ long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L);
+ long lastScanTime = System.currentTimeMillis() - scanPeriod;
+
if (!blockInfoSet.isEmpty()) {
BlockScanInfo info;
while ((info = blockInfoSet.first()).lastScanTime < 0) {
@@ -586,11 +547,6 @@ class BlockPoolSliceScanner {
private synchronized void updateBytesLeft(long len) {
bytesLeft += len;
}
-
- static File getCurrentFile(FSVolumeInterface vol, String bpid) throws
IOException {
- return LogFileHandler.getCurrentFile(vol.getDirectory(bpid),
- BlockPoolSliceScanner.verificationLogFile);
- }
private synchronized void startNewPeriod() {
LOG.info("Starting a new period : work left in prev period : "
@@ -604,26 +560,21 @@ class BlockPoolSliceScanner {
void scanBlockPoolSlice() {
startNewPeriod();
- if (processedBlocks != null) {
- totalBlocksScannedInLastRun = processedBlocks.size();
- }
// Create a new processedBlocks structure
processedBlocks = new HashMap<Long, Integer>();
- if (verificationLog != null) {
- try {
- verificationLog.openCurFile();
- } catch (FileNotFoundException ex) {
- LOG.warn("Could not open current file");
- }
- }
if (!assignInitialVerificationTimes()) {
return;
}
// Start scanning
- scan();
+ try {
+ scan();
+ } finally {
+ totalBlocksScannedInLastRun.set(processedBlocks.size());
+ lastScanTime.set(System.currentTimeMillis());
+ }
}
- public void scan() {
+ private void scan() {
if (LOG.isDebugEnabled()) {
LOG.debug("Starting to scan blockpool: " + blockPoolId);
}
@@ -663,7 +614,7 @@ class BlockPoolSliceScanner {
private synchronized void cleanUp() {
if (verificationLog != null) {
try {
- verificationLog.roll();
+ verificationLog.logs.roll();
} catch (IOException ex) {
LOG.warn("Received exception: ", ex);
verificationLog.close();
@@ -686,7 +637,7 @@ class BlockPoolSliceScanner {
int inScanPeriod = 0;
int neverScanned = 0;
- DateFormat dateFormat = new SimpleDateFormat(dateFormatString);
+ DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
int total = blockInfoSet.size();
@@ -751,191 +702,33 @@ class BlockPoolSliceScanner {
/**
* This class takes care of log file used to store the last verification
- * times of the blocks. It rolls the current file when it is too big etc.
- * If there is an error while writing, it stops updating with an error
- * message.
+ * times of the blocks.
*/
private static class LogFileHandler {
-
- private static final String curFileSuffix = ".curr";
- private static final String prevFileSuffix = ".prev";
- private final DateFormat dateFormat = new
SimpleDateFormat(dateFormatString);
-
- static File getCurrentFile(File dir, String filePrefix) {
- return new File(dir, filePrefix + curFileSuffix);
- }
-
- public Reader getPreviousFileReader() throws IOException {
- return new Reader(prevFile);
- }
-
- public Reader getCurrentFileReader() throws IOException {
- return new Reader(curFile);
- }
-
- static boolean isFilePresent(File dir, String filePrefix) {
- return new File(dir, filePrefix + curFileSuffix).exists() ||
- new File(dir, filePrefix + prevFileSuffix).exists();
- }
- private File curFile;
- private File prevFile;
-
- private PrintStream out;
-
- /**
- * Opens the log file for appending.
- * Note that rolling will happen only after "updateLineCount()" is
- * called. This is so that line count could be updated in a separate
- * thread without delaying start up.
- *
- * @param dir where the logs files are located.
- * @param filePrefix prefix of the file.
- * @param maxNumLines max lines in a file (its a soft limit).
- * @throws IOException
- */
- LogFileHandler(File dir, String filePrefix, int maxNumLines)
- throws IOException {
- curFile = new File(dir, filePrefix + curFileSuffix);
- prevFile = new File(dir, filePrefix + prevFileSuffix);
- }
-
- /**
- * Append "\n" + line.
- * If the log file need to be rolled, it will done after
- * appending the text.
- * This does not throw IOException when there is an error while
- * appending. Currently does not throw an error even if rolling
- * fails (may be it should?).
- * return true if append was successful.
- */
- synchronized boolean appendLine(String line) {
- if (out == null) {
- return false;
- }
- out.println();
- out.print(line);
- return true;
- }
-
- boolean appendLine(long verificationTime, long genStamp, long blockId) {
- return appendLine("date=\""
- + dateFormat.format(new Date(verificationTime)) + "\"\t " + "time=\""
- + verificationTime + "\"\t " + "genstamp=\"" + genStamp + "\"\t "
- + "id=\"" + blockId + "\"");
- }
-
- private synchronized void openCurFile() throws FileNotFoundException {
- close();
- out = new PrintStream(new FileOutputStream(curFile, true));
- }
-
- private void roll() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Rolling current file: " + curFile.getAbsolutePath()
- + " to previous file: " + prevFile.getAbsolutePath());
- }
+ private final DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
- if (!prevFile.delete() && prevFile.exists()) {
- throw new IOException("Could not delete " + prevFile);
- }
-
- close();
+ private final RollingLogs logs;
- if (!curFile.renameTo(prevFile)) {
- throw new IOException("Could not rename " + curFile +
- " to " + prevFile);
- }
- }
-
- synchronized void close() {
- if (out != null) {
- out.close();
- out = null;
- }
+ private LogFileHandler(RollingLogs logs) {
+ this.logs = logs;
}
-
- /**
- * This is used to read the lines in order.
- * If the data is not read completely (i.e, untill hasNext() returns
- * false), it needs to be explicitly
- */
- private static class Reader implements Iterator<String>, Closeable {
-
- BufferedReader reader;
- File file;
- String line;
- boolean closed = false;
-
- private Reader(File file) throws IOException {
- reader = null;
- this.file = file;
- readNext();
- }
-
- private boolean openFile() throws IOException {
- if (file == null) {
- return false;
- }
- if (reader != null ) {
- reader.close();
- reader = null;
- }
- if (file.exists()) {
- reader = new BufferedReader(new FileReader(file));
- return true;
- } else {
- return false;
- }
- }
-
- // read next line if possible.
- private void readNext() throws IOException {
- line = null;
- if (reader == null) {
- openFile();
- }
- try {
- if (reader != null && (line = reader.readLine()) != null) {
- return;
- }
- } finally {
- if (!hasNext()) {
- close();
- }
- }
- }
-
- public boolean hasNext() {
- return line != null;
- }
-
- public String next() {
- String curLine = line;
- try {
- readNext();
- } catch (IOException e) {
- LOG.info("Could not read next line in LogHandler", e);
- }
- return curLine;
- }
- public void remove() {
- throw new RuntimeException("remove() is not supported.");
+ void append(long verificationTime, long genStamp, long blockId) {
+ final String m = LogEntry.toString(verificationTime, genStamp, blockId,
+ dateFormat);
+ try {
+ logs.appender().append(m);
+ } catch (IOException e) {
+ LOG.warn("Failed to append to " + logs + ", m=" + m, e);
}
+ }
- public void close() throws IOException {
- if (!closed) {
- try {
- if (reader != null) {
- reader.close();
- }
- } finally {
- file = null;
- reader = null;
- closed = true;
- }
- }
+ void close() {
+ try {
+ logs.appender().close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close the appender of " + logs, e);
}
- }
+ }
}
}
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1299146&r1=1299145&r2=1299146&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
Sat Mar 10 02:06:00 2012
@@ -18,9 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
-import java.io.File;
import java.io.IOException;
-import java.util.Iterator;
import java.util.TreeMap;
import javax.servlet.http.HttpServlet;
@@ -132,24 +130,14 @@ public class DataBlockScanner implements
waitForInit(currentBpId);
synchronized (this) {
if (getBlockPoolSetSize() > 0) {
- // Find nextBpId by finding the last modified current log file, if
any
- long lastScanTime = -1;
- Iterator<String> bpidIterator = blockPoolScannerMap.keySet()
- .iterator();
- while (bpidIterator.hasNext()) {
- String bpid = bpidIterator.next();
- for (FSDatasetInterface.FSVolumeInterface vol :
dataset.getVolumes()) {
- try {
- File currFile = BlockPoolSliceScanner.getCurrentFile(vol,
bpid);
- if (currFile.exists()) {
- long lastModified = currFile.lastModified();
- if (lastScanTime < lastModified) {
- lastScanTime = lastModified;
- nextBpId = bpid;
- }
- }
- } catch (IOException e) {
- LOG.warn("Received exception: ", e);
+ // Find nextBpId by the minimum of the last scan time
+ long lastScanTime = 0;
+ for (String bpid : blockPoolScannerMap.keySet()) {
+ final long t = getBPScanner(bpid).getLastScanTime();
+ if (t != 0L) {
+ if (bpid == null || t < lastScanTime) {
+ lastScanTime = t;
+ nextBpId = bpid;
}
}
}
@@ -157,13 +145,9 @@ public class DataBlockScanner implements
// nextBpId can still be null if no current log is found,
// find nextBpId sequentially.
if (nextBpId == null) {
- if ("".equals(currentBpId)) {
+ nextBpId = blockPoolScannerMap.higherKey(currentBpId);
+ if (nextBpId == null) {
nextBpId = blockPoolScannerMap.firstKey();
- } else {
- nextBpId = blockPoolScannerMap.higherKey(currentBpId);
- if (nextBpId == null) {
- nextBpId = blockPoolScannerMap.firstKey();
- }
}
}
if (nextBpId != null) {
@@ -206,12 +190,8 @@ public class DataBlockScanner implements
}
}
- public synchronized boolean isInitialized(String bpid) {
- BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
- if (bpScanner != null) {
- return bpScanner.isInitialized();
- }
- return false;
+ boolean isInitialized(String bpid) {
+ return getBPScanner(bpid) != null;
}
public synchronized void printBlockReport(StringBuilder buffer,
@@ -260,14 +240,8 @@ public class DataBlockScanner implements
if (blockPoolScannerMap.get(blockPoolId) != null) {
return;
}
- BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(datanode,
dataset,
- conf, blockPoolId);
- try {
- bpScanner.init();
- } catch (IOException ex) {
- LOG.warn("Failed to initialized block scanner for pool id="+blockPoolId);
- return;
- }
+ BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(blockPoolId,
+ datanode, dataset, conf);
blockPoolScannerMap.put(blockPoolId, bpScanner);
LOG.info("Added bpid=" + blockPoolId + " to blockPoolScannerMap, new size="
+ blockPoolScannerMap.size());
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1299146&r1=1299145&r2=1299146&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Sat Mar 10 02:06:00 2012
@@ -38,7 +38,6 @@ import static org.apache.hadoop.hdfs.DFS
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
-import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
@@ -48,8 +47,6 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
@@ -137,7 +134,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
@@ -393,7 +389,7 @@ public class DataNode extends Configured
boolean isBlockTokenEnabled;
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
- public DataBlockScanner blockScanner = null;
+ volatile DataBlockScanner blockScanner = null;
private DirectoryScanner directoryScanner = null;
/** Activated plug-ins. */
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1299146&r1=1299145&r2=1299146&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Sat Mar 10 02:06:00 2012
@@ -18,13 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.BufferedInputStream;
+import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
+import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
+import java.io.PrintStream;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
@@ -37,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@@ -612,8 +616,8 @@ class FSDataset implements FSDatasetInte
}
@Override
- public File getDirectory(String bpid) throws IOException {
- return getBlockPoolSlice(bpid).getDirectory();
+ public String getPath(String bpid) throws IOException {
+ return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
}
@Override
@@ -2301,7 +2305,7 @@ class FSDataset implements FSDatasetInte
DataNode.LOG.warn("Metadata file in memory "
+ memMetaFile.getAbsolutePath()
+ " does not match file found by scan "
- + diskMetaFile.getAbsolutePath());
+ + (diskMetaFile == null? null:
diskMetaFile.getAbsolutePath()));
}
} else {
// Metadata file corresponding to block in memory is missing
@@ -2612,4 +2616,220 @@ class FSDataset implements FSDatasetInte
datafile.getAbsolutePath(), metafile.getAbsolutePath());
return info;
}
+
+ @Override
+ public RollingLogs createRollingLogs(String bpid, String prefix
+ ) throws IOException {
+ String dir = null;
+ final List<FSVolume> volumes = getVolumes();
+ for (FSVolume vol : volumes) {
+ String bpDir = vol.getPath(bpid);
+ if (RollingLogsImpl.isFilePresent(bpDir, prefix)) {
+ dir = bpDir;
+ break;
+ }
+ }
+ if (dir == null) {
+ dir = volumes.get(0).getPath(bpid);
+ }
+ return new RollingLogsImpl(dir, prefix);
+ }
+
+ static class RollingLogsImpl implements RollingLogs {
+ private static final String CURR_SUFFIX = ".curr";
+ private static final String PREV_SUFFIX = ".prev";
+
+ static boolean isFilePresent(String dir, String filePrefix) {
+ return new File(dir, filePrefix + CURR_SUFFIX).exists() ||
+ new File(dir, filePrefix + PREV_SUFFIX).exists();
+ }
+
+ private final File curr;
+ private final File prev;
+ private PrintStream out; //require synchronized access
+
+ private Appender appender = new Appender() {
+ @Override
+ public Appendable append(CharSequence csq) {
+ synchronized(RollingLogsImpl.this) {
+ if (out == null) {
+ throw new IllegalStateException(RollingLogsImpl.this
+ + " is not yet opened.");
+ }
+ out.print(csq);
+ }
+ return this;
+ }
+
+ @Override
+ public Appendable append(char c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Appendable append(CharSequence csq, int start, int end) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ synchronized(RollingLogsImpl.this) {
+ if (out != null) {
+ out.close();
+ out = null;
+ }
+ }
+ }
+ };
+
+
+ private final AtomicInteger numReaders = new AtomicInteger();
+
+ private RollingLogsImpl(String dir, String filePrefix) throws
FileNotFoundException{
+ curr = new File(dir, filePrefix + CURR_SUFFIX);
+ prev = new File(dir, filePrefix + PREV_SUFFIX);
+ out = new PrintStream(new FileOutputStream(curr, true));
+ }
+
+ @Override
+ public Reader iterator(boolean skipPrevFile) throws IOException {
+ numReaders.incrementAndGet();
+ return new Reader(skipPrevFile);
+ }
+
+ @Override
+ public Appender appender() {
+ return appender;
+ }
+
+ @Override
+ public boolean roll() throws IOException {
+ if (numReaders.get() > 0) {
+ return false;
+ }
+ if (!prev.delete() && prev.exists()) {
+ throw new IOException("Failed to delete " + prev);
+ }
+
+ synchronized(this) {
+ appender.close();
+ final boolean renamed = curr.renameTo(prev);
+ out = new PrintStream(new FileOutputStream(curr, true));
+ if (!renamed) {
+ throw new IOException("Failed to rename " + curr + " to " + prev);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return curr.toString();
+ }
+
+ /**
+ * This is used to read the lines in order.
+ * If the data is not read completely (i.e, untill hasNext() returns
+ * false), it needs to be explicitly
+ */
+ private class Reader implements RollingLogs.LineIterator {
+ private File file;
+ private BufferedReader reader;
+ private String line;
+ private boolean closed = false;
+
+ private Reader(boolean skipPrevFile) throws IOException {
+ reader = null;
+ file = skipPrevFile? curr : prev;
+ readNext();
+ }
+
+ @Override
+ public boolean isPrevious() {
+ return file == prev;
+ }
+
+ private boolean openFile() throws IOException {
+
+ for(int i=0; i<2; i++) {
+ if (reader != null || i > 0) {
+ // move to next file
+ file = isPrevious()? curr : null;
+ }
+ if (file == null) {
+ return false;
+ }
+ if (file.exists()) {
+ break;
+ }
+ }
+
+ if (reader != null ) {
+ reader.close();
+ reader = null;
+ }
+
+ reader = new BufferedReader(new FileReader(file));
+ return true;
+ }
+
+ // read next line if possible.
+ private void readNext() throws IOException {
+ line = null;
+ try {
+ if (reader != null && (line = reader.readLine()) != null) {
+ return;
+ }
+ if (line == null) {
+ // move to the next file.
+ if (openFile()) {
+ readNext();
+ }
+ }
+ } finally {
+ if (!hasNext()) {
+ close();
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return line != null;
+ }
+
+ @Override
+ public String next() {
+ String curLine = line;
+ try {
+ readNext();
+ } catch (IOException e) {
+ DataBlockScanner.LOG.warn("Failed to read next line.", e);
+ }
+ return curLine;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ try {
+ if (reader != null) {
+ reader.close();
+ }
+ } finally {
+ file = null;
+ reader = null;
+ closed = true;
+ final int n = numReaders.decrementAndGet();
+ assert(n >= 0);
+ }
+ }
+ }
+ }
+ }
}
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1299146&r1=1299145&r2=1299146&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
Sat Mar 10 02:06:00 2012
@@ -88,13 +88,21 @@ public interface FSDatasetInterface<V ex
/** @return the available storage space in bytes. */
public long getAvailable() throws IOException;
- /** @return the directory for the block pool. */
- public File getDirectory(String bpid) throws IOException;
+ /** @return the path to the volume */
+ public String getPath(String bpid) throws IOException;
/** @return the directory for the finalized blocks in the block pool. */
public File getFinalizedDir(String bpid) throws IOException;
}
+ /**
+ * Create rolling logs.
+ *
+ * @param prefix the prefix of the log names.
+ * @return rolling logs
+ */
+ public RollingLogs createRollingLogs(String bpid, String prefix) throws
IOException;
+
/** @return a list of volumes. */
public List<V> getVolumes();
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java?rev=1299146&r1=1299145&r2=1299146&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
Sat Mar 10 02:06:00 2012
@@ -18,27 +18,28 @@
package org.apache.hadoop.hdfs;
+import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.util.Random;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.io.*;
-import java.util.Random;
+
+import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
-import junit.framework.TestCase;
-
/**
* This test verifies that block verification occurs on the datanode
*/
@@ -392,7 +393,7 @@ public class TestDatanodeBlockScanner ex
}
private static void waitForBlockDeleted(ExtendedBlock blk, int dnIndex,
- long timeout) throws IOException, TimeoutException, InterruptedException
{
+ long timeout) throws TimeoutException, InterruptedException {
File blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
long failtime = System.currentTimeMillis()
+ ((timeout > 0) ? timeout : Long.MAX_VALUE);
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1299146&r1=1299145&r2=1299146&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Sat Mar 10 02:06:00 2012
@@ -435,7 +435,7 @@ public class SimulatedFSDataset
}
@Override // FSDatasetInterface
- public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException
{
+ public synchronized void unfinalizeBlock(ExtendedBlock b) {
if (isValidRbw(b)) {
blockMap.remove(b.getLocalBlock());
}
@@ -456,12 +456,12 @@ public class SimulatedFSDataset
}
@Override // FSDatasetMBean
- public long getCapacity() throws IOException {
+ public long getCapacity() {
return storage.getCapacity();
}
@Override // FSDatasetMBean
- public long getDfsUsed() throws IOException {
+ public long getDfsUsed() {
return storage.getUsed();
}
@@ -471,7 +471,7 @@ public class SimulatedFSDataset
}
@Override // FSDatasetMBean
- public long getRemaining() throws IOException {
+ public long getRemaining() {
return storage.getFree();
}
@@ -938,13 +938,13 @@ public class SimulatedFSDataset
@Override // FSDatasetInterface
public FinalizedReplica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId,
- long newlength) throws IOException {
+ long newlength) {
return new FinalizedReplica(
oldBlock.getBlockId(), newlength, recoveryId, null, null);
}
@Override // FSDatasetInterface
- public long getReplicaVisibleLength(ExtendedBlock block) throws IOException {
+ public long getReplicaVisibleLength(ExtendedBlock block) {
return block.getNumBytes();
}
@@ -1013,4 +1013,9 @@ public class SimulatedFSDataset
public Map<String, Object> getVolumeInfoMap() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public RollingLogs createRollingLogs(String bpid, String prefix) {
+ throw new UnsupportedOperationException();
+ }
}