mukund-thakur commented on a change in pull request #3446: URL: https://github.com/apache/hadoop/pull/3446#discussion_r710229393
########## File path: hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestDataBlocks.java ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.store; + +import java.io.IOException; +import java.util.Random; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.test.LambdaTestUtils; + +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY; +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK; +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * UTs to test {@link DataBlocks} functionalities. + */ +public class TestDataBlocks { + private final Configuration configuration = new Configuration(); + private static final int ONE_KB = 1024; + private static final Logger LOG = + LoggerFactory.getLogger(TestDataBlocks.class); + + /** + * Test to verify different DataBlocks factories, different operations. + */ + @Test + public void testDataBlocksFactory() throws Exception { + testCreateFactory(DATA_BLOCKS_BUFFER_DISK); + testCreateFactory(DATA_BLOCKS_BUFFER_ARRAY); + testCreateFactory(DATA_BLOCKS_BYTEBUFFER); + } + + /** + * Verify creation of a data block factory and it's operations. + * + * @param nameOfFactory Name of the DataBlock factory to be created. + * @throws IOException Throw IOE in case of failure while creating a block. + */ + public void testCreateFactory(String nameOfFactory) throws Exception { + LOG.info("Testing: {}", nameOfFactory); + DataBlocks.BlockFactory diskFactory = Review comment: nit: change name to blockFactory as it is not always disk ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java ########## @@ -0,0 +1,1123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.store; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.DirectBufferPool; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; + +/** + * A class to provide disk, byteBuffer and byteArray option for Filesystem + * OutputStreams. + * <ul> + * <li> + * Disk: Uses Disk space to write the blocks. Is suited best to avoid + * OutOfMemory Errors in Java heap space. + * </li> + * <li> + * byteBuffer: Uses DirectByteBuffer to allocate memory off-heap to + * provide faster writing of DataBlocks with some risk of running + * OutOfMemory. + * </li> + * <li> + * byteArray: Uses byte[] to write a block of data. On heap and does have + * risk of running OutOfMemory fairly easily. + * </li> + * </ul> + * <p> + * Implementation of DataBlocks taken from HADOOP-13560 to support huge file + * uploads in S3A with different options rather than one. + */ +public final class DataBlocks { + + private static final Logger LOG = + LoggerFactory.getLogger(DataBlocks.class); + + /** + * Buffer blocks to disk. + * Capacity is limited to available disk space. + */ + public static final String DATA_BLOCKS_BUFFER_DISK = "disk"; + + /** + * Use a byte buffer. + */ + public static final String DATA_BLOCKS_BYTEBUFFER = "bytebuffer"; + + /** + * Use an in-memory array. Fast but will run of heap rapidly. + */ + public static final String DATA_BLOCKS_BUFFER_ARRAY = "array"; + + private DataBlocks() { + } + + /** + * Validate args to a write command. These are the same validation checks + * expected for any implementation of {@code OutputStream.write()}. + * + * @param b byte array containing data. + * @param off offset in array where to start. + * @param len number of bytes to be written. + * @throws NullPointerException for a null buffer + * @throws IndexOutOfBoundsException if indices are out of range + */ + public static void validateWriteArgs(byte[] b, int off, int len) + throws IOException { + Preconditions.checkNotNull(b); + if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException( + "write (b[" + b.length + "], " + off + ", " + len + ')'); + } + } + + /** + * Create a factory. + * + * @param keyToBufferDir Key to buffer directory config for a FS. + * @param configuration factory configurations. + * @param name factory name -the option from {@link CommonConfigurationKeys}. + * @return the factory, ready to be initialized. + * @throws IllegalArgumentException if the name is unknown. + */ + public static BlockFactory createFactory(String keyToBufferDir, + Configuration configuration, + String name) { + LOG.debug("Creating DataFactory of type : {}", name); + switch (name) { + case DATA_BLOCKS_BUFFER_ARRAY: + return new ArrayBlockFactory(keyToBufferDir, configuration); + case DATA_BLOCKS_BUFFER_DISK: + return new DiskBlockFactory(keyToBufferDir, configuration); + case DATA_BLOCKS_BYTEBUFFER: + return new ByteBufferBlockFactory(keyToBufferDir, configuration); + default: + throw new IllegalArgumentException("Unsupported block buffer" + + " \"" + name + '"'); + } + } + + /** + * The output information for an upload. + * It can be one of a file, an input stream or a byteArray. + * {@link #toByteArray()} method to be used to convert the data into byte + * array to be doen in this class as well. Review comment: nit: done ########## File path: hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assume; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE; + +/** + * Testing Huge file for AbfsOutputStream. + */ +@RunWith(Parameterized.class) +public class ITestAbfsHugeFiles extends AbstractAbfsScaleTest { Review comment: Buffering mechanism for this test will be disk since it is default right? Can we parameterize this test to run with bytebuffer and array as well? ########## File path: hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assume; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE; + +/** + * Testing Huge file for AbfsOutputStream. + */ +@RunWith(Parameterized.class) +public class ITestAbfsHugeFiles extends AbstractAbfsScaleTest { + private static final int ONE_MB = 1024 * 1024; + private static final int EIGHT_MB = 8 * ONE_MB; + private final int size; + + @Parameterized.Parameters(name = "Size={0}") + public static Iterable<Object[]> sizes() { + return Arrays.asList(new Object[][] { + { DEFAULT_WRITE_BUFFER_SIZE }, + { getHugeFileUploadValue() } }); + } + + public ITestAbfsHugeFiles(int size) throws Exception { + this.size = size; + } + + /** + * Testing Huge files written at once on AbfsOutputStream. + */ + @Test + public void testHugeFileWrite() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + Path filePath = path(getMethodName()); + final byte[] b = new byte[size]; + new Random().nextBytes(b); + try (FSDataOutputStream out = fs.create(filePath)) { + out.write(b); + } + // Verify correct length was uploaded. Don't want to verify contents + // here, as this would increase the test time significantly. + assertEquals("Mismatch in content length of file uploaded", size, + fs.getFileStatus(filePath).getLen()); + } + + /** + * Testing Huge files written in chunks of 8M in lots of writes. + */ + @Test + public void testLotsOfWrites() throws IOException { + assume("If the size isn't a multiple of 8M this test would not pass, so " + + "skip", size % EIGHT_MB == 0); + AzureBlobFileSystem fs = getFileSystem(); + Path filePath = path(getMethodName()); + final byte[] b = new byte[size]; + new Random().nextBytes(b); + try (FSDataOutputStream out = fs.create(filePath)) { + int offset = 0; + for (int i = 0; i < size / EIGHT_MB; i++) { + out.write(b, offset, EIGHT_MB); + offset += EIGHT_MB; + } + } + LOG.info(String.valueOf(size % EIGHT_MB)); Review comment: add message.. What are we logging here. ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java ########## @@ -0,0 +1,1123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.store; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.DirectBufferPool; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; + +/** + * A class to provide disk, byteBuffer and byteArray option for Filesystem + * OutputStreams. + * <ul> + * <li> + * Disk: Uses Disk space to write the blocks. Is suited best to avoid + * OutOfMemory Errors in Java heap space. + * </li> + * <li> + * byteBuffer: Uses DirectByteBuffer to allocate memory off-heap to + * provide faster writing of DataBlocks with some risk of running + * OutOfMemory. + * </li> + * <li> + * byteArray: Uses byte[] to write a block of data. On heap and does have + * risk of running OutOfMemory fairly easily. + * </li> + * </ul> + * <p> + * Implementation of DataBlocks taken from HADOOP-13560 to support huge file + * uploads in S3A with different options rather than one. + */ +public final class DataBlocks { + + private static final Logger LOG = + LoggerFactory.getLogger(DataBlocks.class); + + /** + * Buffer blocks to disk. + * Capacity is limited to available disk space. + */ + public static final String DATA_BLOCKS_BUFFER_DISK = "disk"; + + /** + * Use a byte buffer. + */ + public static final String DATA_BLOCKS_BYTEBUFFER = "bytebuffer"; + + /** + * Use an in-memory array. Fast but will run of heap rapidly. + */ + public static final String DATA_BLOCKS_BUFFER_ARRAY = "array"; + + private DataBlocks() { + } + + /** + * Validate args to a write command. These are the same validation checks + * expected for any implementation of {@code OutputStream.write()}. + * + * @param b byte array containing data. + * @param off offset in array where to start. + * @param len number of bytes to be written. + * @throws NullPointerException for a null buffer + * @throws IndexOutOfBoundsException if indices are out of range + */ + public static void validateWriteArgs(byte[] b, int off, int len) + throws IOException { + Preconditions.checkNotNull(b); + if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException( + "write (b[" + b.length + "], " + off + ", " + len + ')'); + } + } + + /** + * Create a factory. + * + * @param keyToBufferDir Key to buffer directory config for a FS. + * @param configuration factory configurations. + * @param name factory name -the option from {@link CommonConfigurationKeys}. + * @return the factory, ready to be initialized. + * @throws IllegalArgumentException if the name is unknown. + */ + public static BlockFactory createFactory(String keyToBufferDir, + Configuration configuration, + String name) { + LOG.debug("Creating DataFactory of type : {}", name); + switch (name) { + case DATA_BLOCKS_BUFFER_ARRAY: + return new ArrayBlockFactory(keyToBufferDir, configuration); + case DATA_BLOCKS_BUFFER_DISK: + return new DiskBlockFactory(keyToBufferDir, configuration); + case DATA_BLOCKS_BYTEBUFFER: + return new ByteBufferBlockFactory(keyToBufferDir, configuration); + default: + throw new IllegalArgumentException("Unsupported block buffer" + + " \"" + name + '"'); + } + } + + /** + * The output information for an upload. + * It can be one of a file, an input stream or a byteArray. + * {@link #toByteArray()} method to be used to convert the data into byte + * array to be doen in this class as well. + * When closed, any stream is closed. Any source file is untouched. + */ + public static final class BlockUploadData implements Closeable { Review comment: I am wondering every time I look at this class. Why is not this done using abstract with three implementation each for file, array and disk? @steveloughran ########## File path: hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assume; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE; + +/** + * Testing Huge file for AbfsOutputStream. + */ +@RunWith(Parameterized.class) +public class ITestAbfsHugeFiles extends AbstractAbfsScaleTest { + private static final int ONE_MB = 1024 * 1024; + private static final int EIGHT_MB = 8 * ONE_MB; + private final int size; + + @Parameterized.Parameters(name = "Size={0}") + public static Iterable<Object[]> sizes() { + return Arrays.asList(new Object[][] { + { DEFAULT_WRITE_BUFFER_SIZE }, + { getHugeFileUploadValue() } }); + } + + public ITestAbfsHugeFiles(int size) throws Exception { + this.size = size; + } + + /** + * Testing Huge files written at once on AbfsOutputStream. + */ + @Test + public void testHugeFileWrite() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + Path filePath = path(getMethodName()); + final byte[] b = new byte[size]; + new Random().nextBytes(b); + try (FSDataOutputStream out = fs.create(filePath)) { + out.write(b); + } + // Verify correct length was uploaded. Don't want to verify contents + // here, as this would increase the test time significantly. + assertEquals("Mismatch in content length of file uploaded", size, Review comment: Can't we verify just the checksum of the file? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
