Author: suresh
Date: Thu Jan 3 21:35:01 2013
New Revision: 1428606
URL: http://svn.apache.org/viewvc?rev=1428606&view=rev
Log:
HDFS-4297. Fix issues related to datanode concurrent reading and writing on
Windows. Contributed by Arpit Agarwal and Chuan Liu.
Modified:
hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/CHANGES.branch-trunk-win.txt
hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileConcurrentReader.java
Modified:
hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/CHANGES.branch-trunk-win.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/CHANGES.branch-trunk-win.txt?rev=1428606&r1=1428605&r2=1428606&view=diff
==============================================================================
---
hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/CHANGES.branch-trunk-win.txt
(original)
+++
hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/CHANGES.branch-trunk-win.txt
Thu Jan 3 21:35:01 2013
@@ -8,3 +8,6 @@ branch-trunk-win changes - unreleased
HDFS-4316. branch-trunk-win contains test code accidentally added during
work on fixing tests on Windows. (Chris Nauroth via suresh)
+
+ HDFS-4297. Fix issues related to datanode concurrent reading and writing on
+ Windows. (Arpit Agarwal, Chuan Liu via suresh)
Modified:
hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1428606&r1=1428605&r2=1428606&view=diff
==============================================================================
---
hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
(original)
+++
hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
Thu Jan 3 21:35:01 2013
@@ -41,6 +41,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -75,6 +76,7 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -90,6 +92,15 @@ import org.apache.hadoop.util.Time;
@InterfaceAudience.Private
class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
static final Log LOG = LogFactory.getLog(FsDatasetImpl.class);
+ private final static boolean isNativeIOAvailable;
+ static {
+ isNativeIOAvailable = NativeIO.isAvailable();
+ if (Path.WINDOWS && !isNativeIOAvailable) {
+ LOG.warn("Data node cannot fully support concurrent reading"
+ + " and writing without native code extensions on Windows.");
+ }
+ }
+
@Override // FsDatasetSpi
public List<FsVolumeImpl> getVolumes() {
@@ -147,6 +158,11 @@ class FsDatasetImpl implements FsDataset
if (meta == null || !meta.exists()) {
return null;
}
+ if (isNativeIOAvailable) {
+ return new LengthInputStream(
+ NativeIO.getShareDeleteFileInputStream(meta),
+ meta.length());
+ }
return new LengthInputStream(new FileInputStream(meta), meta.length());
}
@@ -322,18 +338,22 @@ class FsDatasetImpl implements FsDataset
public InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException {
File blockFile = getBlockFileNoExistsCheck(b);
- RandomAccessFile blockInFile;
- try {
- blockInFile = new RandomAccessFile(blockFile, "r");
- } catch (FileNotFoundException fnfe) {
- throw new IOException("Block " + b + " is not valid. " +
- "Expected block file at " + blockFile + " does not exist.");
- }
+ if (isNativeIOAvailable) {
+ return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
+ } else {
+ RandomAccessFile blockInFile;
+ try {
+ blockInFile = new RandomAccessFile(blockFile, "r");
+ } catch (FileNotFoundException fnfe) {
+ throw new IOException("Block " + b + " is not valid. " +
+ "Expected block file at " + blockFile + " does not exist.");
+ }
- if (seekOffset > 0) {
- blockInFile.seek(seekOffset);
+ if (seekOffset > 0) {
+ blockInFile.seek(seekOffset);
+ }
+ return new FileInputStream(blockInFile.getFD());
}
- return new FileInputStream(blockInFile.getFD());
}
/**
Modified:
hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileConcurrentReader.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileConcurrentReader.java?rev=1428606&r1=1428605&r2=1428606&view=diff
==============================================================================
---
hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileConcurrentReader.java
(original)
+++
hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileConcurrentReader.java
Thu Jan 3 21:35:01 2013
@@ -338,33 +338,33 @@ public class TestFileConcurrentReader {
final AtomicBoolean writerDone = new AtomicBoolean(false);
final AtomicBoolean writerStarted = new AtomicBoolean(false);
final AtomicBoolean error = new AtomicBoolean(false);
- final FSDataOutputStream initialOutputStream = fileSystem.create(file);
- final Thread writer = new Thread(new Runnable() {
- private FSDataOutputStream outputStream = initialOutputStream;
+ final Thread writer = new Thread(new Runnable() {
@Override
public void run() {
try {
- for (int i = 0; !error.get() && i < numWrites; i++) {
- try {
+ FSDataOutputStream outputStream = fileSystem.create(file);
+ if (syncType == SyncType.APPEND) {
+ outputStream.close();
+ outputStream = fileSystem.append(file);
+ }
+ try {
+ for (int i = 0; !error.get() && i < numWrites; i++) {
final byte[] writeBuf =
- DFSTestUtil.generateSequentialBytes(i * writeSize, writeSize);
+ DFSTestUtil.generateSequentialBytes(i * writeSize,
writeSize);
outputStream.write(writeBuf);
if (syncType == SyncType.SYNC) {
outputStream.hflush();
- } else { // append
- outputStream.close();
- outputStream = fileSystem.append(file);
}
writerStarted.set(true);
- } catch (IOException e) {
- error.set(true);
- LOG.error("error writing to file", e);
}
+ } catch (IOException e) {
+ error.set(true);
+ LOG.error("error writing to file", e);
+ } finally {
+ outputStream.close();
}
-
writerDone.set(true);
- outputStream.close();
} catch (Exception e) {
LOG.error("error in writer", e);
@@ -415,7 +415,6 @@ public class TestFileConcurrentReader {
Thread.currentThread().interrupt();
}
- initialOutputStream.close();
}
private boolean validateSequentialBytes(byte[] buf, int startPos, int len) {