Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java?rev=1236049&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java Thu Jan 26 06:50:00 2012 @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.IOUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.*; + +public class TestThrottledInputStream { + private static final Log LOG = LogFactory.getLog(TestThrottledInputStream.class); + private static final int BUFF_SIZE = 1024; + + private enum CB {ONE_C, BUFFER, BUFF_OFFSET} + + @Test + public void testRead() { + File tmpFile; + File outFile; + try { + tmpFile = createFile(1024); + outFile = createFile(); + + tmpFile.deleteOnExit(); + outFile.deleteOnExit(); + + long maxBandwidth = copyAndAssert(tmpFile, outFile, 0, 1, -1, CB.BUFFER); + + copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFFER); +/* + copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.BUFFER); + copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.BUFFER); +*/ + + copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFF_OFFSET); +/* + copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.BUFF_OFFSET); + copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.BUFF_OFFSET); +*/ + + copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.ONE_C); +/* + copyAndAssert(tmpFile, outFile, maxBandwidth, 10, 0, CB.ONE_C); + copyAndAssert(tmpFile, outFile, maxBandwidth, 50, 0, CB.ONE_C); +*/ + } catch (IOException e) { + LOG.error("Exception encountered ", e); + } + } + + private long copyAndAssert(File tmpFile, File outFile, + long maxBandwidth, float factor, + int sleepTime, CB flag) throws IOException { + long bandwidth; + ThrottledInputStream in; + long maxBPS = (long) (maxBandwidth / factor); + + if (maxBandwidth == 0) { + in = new ThrottledInputStream(new FileInputStream(tmpFile)); + } else { + in = new ThrottledInputStream(new FileInputStream(tmpFile), maxBPS); + } + OutputStream out = new FileOutputStream(outFile); + try { + if (flag == CB.BUFFER) { + copyBytes(in, out, BUFF_SIZE); + } else if (flag == CB.BUFF_OFFSET){ + copyBytesWithOffset(in, out, BUFF_SIZE); + } else { + copyByteByByte(in, out); + } + + LOG.info(in); + bandwidth = in.getBytesPerSec(); + Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length()); + Assert.assertTrue(in.getBytesPerSec() > maxBandwidth / (factor * 1.2)); + Assert.assertTrue(in.getTotalSleepTime() > sleepTime || in.getBytesPerSec() <= maxBPS); + } finally { + IOUtils.closeStream(in); + IOUtils.closeStream(out); + } + return bandwidth; + } + + private static void copyBytesWithOffset(InputStream in, OutputStream out, int buffSize) + throws IOException { + + byte buf[] = new byte[buffSize]; + int bytesRead = in.read(buf, 0, buffSize); + while (bytesRead >= 0) { + out.write(buf, 0, bytesRead); + bytesRead = in.read(buf); + } + } + + private static void copyByteByByte(InputStream in, OutputStream out) + throws IOException { + + int ch = in.read(); + while (ch >= 0) { + out.write(ch); + ch = in.read(); + } + } + + private static void copyBytes(InputStream in, OutputStream out, int buffSize) + throws IOException { + + byte buf[] = new byte[buffSize]; + int bytesRead = in.read(buf); + while (bytesRead >= 0) { + out.write(buf, 0, bytesRead); + bytesRead = in.read(buf); + } + } + + private File createFile(long sizeInKB) throws IOException { + File tmpFile = createFile(); + writeToFile(tmpFile, sizeInKB); + return tmpFile; + } + + private File createFile() throws IOException { + return File.createTempFile("tmp", "dat"); + } + + private void writeToFile(File tmpFile, long sizeInKB) throws IOException { + OutputStream out = new FileOutputStream(tmpFile); + try { + byte[] buffer = new byte [1024]; + for (long index = 0; index < sizeInKB; index++) { + out.write(buffer); + } + } finally { + IOUtils.closeStream(out); + } + } +}
Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml?rev=1236049&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml (added) +++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml Thu Jan 26 06:50:00 2012 @@ -0,0 +1,57 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<configuration> + +<property> + <name>ssl.client.truststore.location</name> + <value>/path/to/truststore/keys/keystore.jks</value> + <description>Truststore to be used by clients like distcp. Must be + specified. + </description> +</property> + +<property> + <name>ssl.client.truststore.password</name> + <value>changeit</value> + <description>Optional. Default value is "". + </description> +</property> + +<property> + <name>ssl.client.truststore.type</name> + <value>jks</value> + <description>Optional. Default value is "jks". + </description> +</property> + +<property> + <name>ssl.client.keystore.location</name> + <value>/path/to/keystore/keys/keystore.jks</value> + <description>Keystore to be used by clients like distcp. Must be + specified. + </description> +</property> + +<property> + <name>ssl.client.keystore.password</name> + <value>changeit</value> + <description>Optional. Default value is "". + </description> +</property> + +<property> + <name>ssl.client.keystore.keypassword</name> + <value>changeit</value> + <description>Optional. Default value is "". + </description> +</property> + +<property> + <name>ssl.client.keystore.type</name> + <value>jks</value> + <description>Optional. Default value is "jks". + </description> +</property> + +</configuration> Modified: hadoop/common/branches/branch-0.23/hadoop-tools/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/pom.xml?rev=1236049&r1=1236048&r2=1236049&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-tools/pom.xml (original) +++ hadoop/common/branches/branch-0.23/hadoop-tools/pom.xml Thu Jan 26 06:50:00 2012 @@ -29,6 +29,7 @@ <modules> <module>hadoop-streaming</module> + <module>hadoop-distcp</module> <module>hadoop-archives</module> <module>hadoop-rumen</module> <module>hadoop-tools-dist</module>
