Author: szetszwo
Date: Fri Jul 27 06:27:08 2012
New Revision: 1366293
URL: http://svn.apache.org/viewvc?rev=1366293&view=rev
Log:
HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations to
get around a Java library bug causing OutOfMemoryError.
Added:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1366293&r1=1366292&r2=1366293&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Fri Jul 27 06:27:08 2012
@@ -83,6 +83,9 @@ Release 0.23.3 - UNRELEASED
HDFS-3626. Creating file with invalid path can corrupt edit log (todd)
+ HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations
+ to get around a Java library bug causing OutOfMemoryError. (szetszwo)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1366293&r1=1366292&r2=1366293&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
Fri Jul 27 06:27:08 2012
@@ -412,6 +412,7 @@ public class WebHdfsFileSystem extends F
//Step 2) Submit another Http request with the URL from the Location
header with data.
conn = (HttpURLConnection)new URL(redirect).openConnection();
conn.setRequestMethod(op.getType().toString());
+ conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
return conn;
}
@@ -824,8 +825,7 @@ public class WebHdfsFileSystem extends F
}
private static WebHdfsFileSystem getWebHdfs(
- final Token<?> token, final Configuration conf
- ) throws IOException, InterruptedException, URISyntaxException {
+ final Token<?> token, final Configuration conf) throws IOException {
final InetSocketAddress nnAddr = SecurityUtil.getTokenServiceAddr(token);
final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr);
@@ -839,12 +839,7 @@ public class WebHdfsFileSystem extends F
// update the kerberos credentials, if they are coming from a keytab
ugi.reloginFromKeytab();
- try {
- WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
- return webhdfs.renewDelegationToken(token);
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
+ return getWebHdfs(token, conf).renewDelegationToken(token);
}
@Override
@@ -854,12 +849,7 @@ public class WebHdfsFileSystem extends F
// update the kerberos credentials, if they are coming from a keytab
ugi.checkTGTAndReloginFromKeytab();
- try {
- final WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
- webhdfs.cancelDelegationToken(token);
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
+ getWebHdfs(token, conf).cancelDelegationToken(token);
}
}
Added:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java?rev=1366293&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
(added)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
Fri Jul 27 06:27:08 2012
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test WebHDFS */
+public class TestWebHDFS {
+ static final Log LOG = LogFactory.getLog(TestWebHDFS.class);
+
+ static final Random RANDOM = new Random();
+
+ static final long systemStartTime = System.nanoTime();
+
+ /** A timer for measuring performance. */
+ static class Ticker {
+ final String name;
+ final long startTime = System.nanoTime();
+ private long previousTick = startTime;
+
+ Ticker(final String name, String format, Object... args) {
+ this.name = name;
+ LOG.info(String.format("\n\n%s START: %s\n",
+ name, String.format(format, args)));
+ }
+
+ void tick(final long nBytes, String format, Object... args) {
+ final long now = System.nanoTime();
+ if (now - previousTick > 10000000000L) {
+ previousTick = now;
+ final double mintues = (now - systemStartTime)/60000000000.0;
+ LOG.info(String.format("\n\n%s %.2f min) %s %s\n", name, mintues,
+ String.format(format, args), toMpsString(nBytes, now)));
+ }
+ }
+
+ void end(final long nBytes) {
+ final long now = System.nanoTime();
+ final double seconds = (now - startTime)/1000000000.0;
+ LOG.info(String.format("\n\n%s END: duration=%.2fs %s\n",
+ name, seconds, toMpsString(nBytes, now)));
+ }
+
+ String toMpsString(final long nBytes, final long now) {
+ final double mb = nBytes/(double)(1<<20);
+ final double mps = mb*1000000000.0/(now - startTime);
+ return String.format("[nBytes=%.2fMB, speed=%.2fMB/s]", mb, mps);
+ }
+ }
+
+ @Test
+ public void testLargeFile() throws Exception {
+ largeFileTest(200L << 20); //200MB file length
+ }
+
+ /** Test read and write large files. */
+ static void largeFileTest(final long fileLength) throws Exception {
+ final Configuration conf = WebHdfsTestUtil.createConf();
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(3)
+ .build();
+ try {
+ cluster.waitActive();
+
+ final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf);
+ final Path dir = new Path("/test/largeFile");
+ Assert.assertTrue(fs.mkdirs(dir));
+
+ final byte[] data = new byte[1 << 20];
+ RANDOM.nextBytes(data);
+
+ final byte[] expected = new byte[2 * data.length];
+ System.arraycopy(data, 0, expected, 0, data.length);
+ System.arraycopy(data, 0, expected, data.length, data.length);
+
+ final Path p = new Path(dir, "file");
+ final Ticker t = new Ticker("WRITE", "fileLength=" + fileLength);
+ final FSDataOutputStream out = fs.create(p);
+ try {
+ long remaining = fileLength;
+ for(; remaining > 0;) {
+ t.tick(fileLength - remaining, "remaining=%d", remaining);
+
+ final int n = (int)Math.min(remaining, data.length);
+ out.write(data, 0, n);
+ remaining -= n;
+ }
+ } finally {
+ out.close();
+ }
+ t.end(fileLength);
+
+ Assert.assertEquals(fileLength, fs.getFileStatus(p).getLen());
+
+ final long smallOffset = RANDOM.nextInt(1 << 20) + (1 << 20);
+ final long largeOffset = fileLength - smallOffset;
+ final byte[] buf = new byte[data.length];
+
+ verifySeek(fs, p, largeOffset, fileLength, buf, expected);
+ verifySeek(fs, p, smallOffset, fileLength, buf, expected);
+
+ verifyPread(fs, p, largeOffset, fileLength, buf, expected);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ static void checkData(long offset, long remaining, int n,
+ byte[] actual, byte[] expected) {
+ if (RANDOM.nextInt(100) == 0) {
+ int j = (int)(offset % actual.length);
+ for(int i = 0; i < n; i++) {
+ if (expected[j] != actual[i]) {
+ Assert.fail("expected[" + j + "]=" + expected[j]
+ + " != actual[" + i + "]=" + actual[i]
+ + ", offset=" + offset + ", remaining=" + remaining + ", n=" +
n);
+ }
+ j++;
+ }
+ }
+ }
+
+ /** test seek */
+ static void verifySeek(FileSystem fs, Path p, long offset, long length,
+ byte[] buf, byte[] expected) throws IOException {
+ long remaining = length - offset;
+ long checked = 0;
+ LOG.info("XXX SEEK: offset=" + offset + ", remaining=" + remaining);
+
+ final Ticker t = new Ticker("SEEK", "offset=%d, remaining=%d",
+ offset, remaining);
+ final FSDataInputStream in = fs.open(p, 64 << 10);
+ in.seek(offset);
+ for(; remaining > 0; ) {
+ t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
+ final int n = (int)Math.min(remaining, buf.length);
+ in.readFully(buf, 0, n);
+ checkData(offset, remaining, n, buf, expected);
+
+ offset += n;
+ remaining -= n;
+ checked += n;
+ }
+ in.close();
+ t.end(checked);
+ }
+
+ static void verifyPread(FileSystem fs, Path p, long offset, long length,
+ byte[] buf, byte[] expected) throws IOException {
+ long remaining = length - offset;
+ long checked = 0;
+ LOG.info("XXX PREAD: offset=" + offset + ", remaining=" + remaining);
+
+ final Ticker t = new Ticker("PREAD", "offset=%d, remaining=%d",
+ offset, remaining);
+ final FSDataInputStream in = fs.open(p, 64 << 10);
+ for(; remaining > 0; ) {
+ t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
+ final int n = (int)Math.min(remaining, buf.length);
+ in.readFully(offset, buf, 0, n);
+ checkData(offset, remaining, n, buf, expected);
+
+ offset += n;
+ remaining -= n;
+ checked += n;
+ }
+ in.close();
+ t.end(checked);
+ }
+}
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java?rev=1366293&r1=1366292&r2=1366293&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
Fri Jul 27 06:27:08 2012
@@ -40,6 +40,12 @@ import org.junit.Assert;
public class WebHdfsTestUtil {
public static final Log LOG = LogFactory.getLog(WebHdfsTestUtil.class);
+ public static Configuration createConf() {
+ final Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
+ return conf;
+ }
+
public static WebHdfsFileSystem getWebHdfsFileSystem(final Configuration conf
) throws IOException, URISyntaxException {
final String uri = WebHdfsFileSystem.SCHEME + "://"