This is an automated email from the ASF dual-hosted git repository. jinglun pushed a commit to branch HADOOP-19236 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 43ed890e453191d2a08d2d31a535019a0760d946 Author: lijinglun <lijing...@bytedance.com> AuthorDate: Mon Oct 21 14:51:33 2024 +0800 Integration of TOS: Add test cases of ObjectOutputStream, MagicOutputStream, FSUtils. --- .../fs/tosfs/commit/TestMagicOutputStream.java | 196 ++++++++++ .../hadoop/fs/tosfs/object/ObjectTestUtils.java | 118 ++++++ .../fs/tosfs/object/TestObjectOutputStream.java | 407 +++++++++++++++++++++ .../org/apache/hadoop/fs/tosfs/util/TempFiles.java | 98 +++++ .../apache/hadoop/fs/tosfs/util/TestFSUtils.java | 66 ++++ 5 files changed, 885 insertions(+) diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java new file mode 100644 index 00000000000..0adba267667 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.common.ThreadPools; +import org.apache.hadoop.fs.tosfs.object.MultipartUpload; +import org.apache.hadoop.fs.tosfs.object.ObjectStorageTestBase; +import org.apache.hadoop.fs.tosfs.object.ObjectUtils; +import org.apache.hadoop.fs.tosfs.object.Part; +import org.apache.hadoop.fs.tosfs.object.staging.StagingPart; +import org.apache.hadoop.fs.tosfs.object.staging.State; +import org.apache.hadoop.fs.tosfs.util.TestUtility; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.concurrent.ExecutorService; + +public class TestMagicOutputStream extends ObjectStorageTestBase { + + private static ExecutorService threadPool; + + @BeforeClass + public static void beforeClass() { + threadPool = ThreadPools.newWorkerPool("TestMagicOutputStream-pool"); + } + + @AfterClass + public static void afterClass() { + if (!threadPool.isShutdown()) { + threadPool.shutdown(); + } + } + + private static Path path(String p) { + return new Path(p); + } + + private static Path path(Path parent, String child) { + return new Path(parent, child); + } + + @Test + public void testCreateDestKey() { + Object[][] testCases = new Object[][]{ + new Object[]{path("tos://bucket/__magic/a.txt"), "a.txt"}, + new Object[]{path("tos://bucket/output/__magic/job-1/tasks/tasks-attempt-0/a.txt"), "output/a.txt"}, + new Object[]{path("tos://bucket/__magic/job0/task0/__base/a.txt"), "a.txt"}, + new Object[]{path("tos://bucket/output/__magic/job0/task0/__base/part/part-m-1000"), "output/part/part-m-1000"}, + new Object[]{path("tos://bucket/a/b/c/__magic/__base/d/e/f"), "a/b/c/d/e/f"}, + new Object[]{path("tos://bucket/a/b/c/__magic/d/e/f"), "a/b/c/f"}, + }; + + for (Object[] input : testCases) { + String actualDestKey = MagicOutputStream.toDestKey((Path) input[0]); + Assert.assertEquals("Unexpected destination key.", actualDestKey, input[1]); + } + } + + @Test + public void testNonMagicPath() { + try (MagicOutputStream ignored = new TestingMagicOutputStream(path(testDir, "non-magic"))) { + Assert.fail("Cannot create magic output stream for non-magic path"); + } catch (Exception ignored) { + } + } + + @Test + public void testWriteZeroByte() throws IOException { + Path magic = path(path(testDir, CommitUtils.MAGIC), "zero-byte.txt"); + MagicOutputStream out = new TestingMagicOutputStream(magic); + // write zero-byte and close. + out.close(); + assertStagingFiles(0, out.stagingParts()); + + // Read and validate the .pending contents + try (InputStream in = storage.get(out.pendingKey()).stream()) { + byte[] data = IOUtils.toByteArray(in); + Pending commit = Pending.deserialize(data); + Assert.assertEquals(storage.bucket().name(), commit.bucket()); + Assert.assertEquals(out.destKey(), commit.destKey()); + Assert.assertTrue(StringUtils.isNoneEmpty(commit.uploadId())); + Assert.assertTrue(commit.createdTimestamp() > 0); + Assert.assertEquals(1, commit.parts().size()); + Assert.assertEquals(0, commit.length()); + Assert.assertEquals(out.upload().uploadId(), commit.uploadId()); + } + } + + public void testWrite(int len) throws IOException { + Path magic = path(path(testDir, CommitUtils.MAGIC), len + ".txt"); + int uploadPartSize = 8 << 20; + int partNum = (len - 1) / (8 << 20) + 1; + + MagicOutputStream out = new TestingMagicOutputStream(magic); + byte[] data = TestUtility.rand(len); + out.write(data); + out.close(); + + assertStagingFiles(partNum, out.stagingParts()); + Assert.assertEquals(ObjectUtils.pathToKey(magic) + CommitUtils.PENDING_SUFFIX, out.pendingKey()); + + Pending commit; + try (InputStream in = storage.get(out.pendingKey()).stream()) { + byte[] serializedData = IOUtils.toByteArray(in); + commit = Pending.deserialize(serializedData); + Assert.assertEquals(storage.bucket().name(), commit.bucket()); + Assert.assertEquals(out.destKey(), commit.destKey()); + Assert.assertTrue(commit.createdTimestamp() > 0); + Assert.assertEquals(len, commit.length()); + Assert.assertEquals(out.upload().uploadId(), commit.uploadId()); + // Verify the upload part list. + Assert.assertEquals(partNum, commit.parts().size()); + if (!commit.parts().isEmpty()) { + for (int i = 0; i < partNum - 1; i += 1) { + Assert.assertEquals(uploadPartSize, commit.parts().get(i).size()); + } + Part lastPart = commit.parts().get(partNum - 1); + Assert.assertTrue(lastPart.size() > 0 && lastPart.size() <= uploadPartSize); + } + } + + // List multipart uploads + int uploadsNum = 0; + for (MultipartUpload upload : storage.listUploads(out.destKey())) { + uploadsNum += 1; + Assert.assertEquals(out.upload(), upload); + } + Assert.assertEquals(1L, uploadsNum); + + // The target object is still not visible for object storage. + Assert.assertNull(storage.head(out.destKey())); + + // Complete the upload and validate the content. + storage.completeUpload(out.destKey(), out.upload().uploadId(), commit.parts()); + try (InputStream in = storage.get(out.destKey()).stream()) { + Assert.assertArrayEquals(data, IOUtils.toByteArray(in)); + } + } + + @Test + public void testWrite1MB() throws IOException { + testWrite(1 << 20); + } + + @Test + public void testWrite24MB() throws IOException { + testWrite(24 << 20); + } + + @Test + public void testWrite100MB() throws IOException { + testWrite(100 << 20); + } + + private static void assertStagingFiles(int expectedNum, List<StagingPart> stagings) { + Assert.assertEquals(expectedNum, stagings.size()); + for (StagingPart staging : stagings) { + Assert.assertEquals(State.CLEANED, staging.state()); + } + } + + private class TestingMagicOutputStream extends MagicOutputStream { + + TestingMagicOutputStream(Path magic) { + super(fs, storage, threadPool, protonConf, magic); + } + + protected void persist(Path p, byte[] data) { + storage().put(ObjectUtils.pathToKey(p), data); + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectTestUtils.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectTestUtils.java new file mode 100644 index 00000000000..b1c8c45bcd9 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectTestUtils.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.Objects; + +public class ObjectTestUtils { + + public static final byte[] EMPTY_BYTES = new byte[]{}; + + private ObjectTestUtils() { + } + + /** + * Assert that all the parent directories should be existing. + * + * @param path to validate, can be directory or file. + */ + public static void assertParentDirExist(Path path) throws IOException { + for (Path p = path.getParent(); p != null && p.getParent() != null; p = p.getParent()) { + assertObject(p, EMPTY_BYTES, true); + } + } + + /** + * Assert that all the parent directories and current directory should be existing. + * + * @param path to validate, must be a directory. + */ + public static void assertDirExist(Path path) throws IOException { + // All parent directories exist. + assertParentDirExist(path); + // The current directory exist. + assertObject(path, EMPTY_BYTES, true); + } + + public static void assertObjectNotExist(Path path) throws IOException { + assertObjectNotExist(path, false); + } + + public static void assertObjectNotExist(Path path, boolean isDir) throws IOException { + ObjectStorage store = ObjectStorageFactory.create( + path.toUri().getScheme(), path.toUri().getHost(), new Configuration()); + String objectKey = ObjectUtils.pathToKey(path, isDir); + ObjectInfo info = store.head(objectKey); + Assert.assertNull(String.format("Object key %s shouldn't exist in backend storage.", objectKey), info); + + store.close(); + } + + public static void assertObject(Path path, byte[] data) throws IOException { + assertObject(path, data, false); + } + + public static void assertObject(Path path, byte[] data, boolean isDir) throws IOException { + ObjectStorage store = ObjectStorageFactory.create( + path.toUri().getScheme(), path.toUri().getHost(), new Configuration()); + String objectKey = ObjectUtils.pathToKey(path, isDir); + // Verify the existence of object. + ObjectInfo info = store.head(objectKey); + Assert.assertNotNull(String.format("there should be an key %s in object storage", objectKey), info); + Assert.assertEquals(info.key(), objectKey); + Assert.assertEquals(data.length, info.size()); + // Verify the data content. + try (InputStream in = store.get(objectKey, 0, -1).stream()) { + byte[] actual = IOUtils.toByteArray(in); + Assert.assertArrayEquals("Unexpected binary", data, actual); + } + + store.close(); + } + + public static void assertMultipartUploadExist(Path path, String uploadId) throws IOException { + ObjectStorage store = ObjectStorageFactory.create( + path.toUri().getScheme(), path.toUri().getHost(), new Configuration()); + String objectKey = ObjectUtils.pathToKey(path, false); + + Iterator<MultipartUpload> uploadIterator = store.listUploads(objectKey).iterator(); + Assert.assertTrue(uploadIterator.hasNext()); + assertMultipartUploadIdExist(uploadIterator, uploadId); + + store.close(); + } + + private static void assertMultipartUploadIdExist(Iterator<MultipartUpload> uploadIterator, String uploadId) { + boolean exist = false; + while (uploadIterator.hasNext()) { + if (Objects.equals(uploadIterator.next().uploadId(), uploadId)) { + exist = true; + } + } + Assert.assertTrue(exist); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java new file mode 100644 index 00000000000..d2b08caa14b --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.common.ThreadPools; +import org.apache.hadoop.fs.tosfs.conf.ConfKeys; +import org.apache.hadoop.fs.tosfs.object.staging.StagingPart; +import org.apache.hadoop.fs.tosfs.object.staging.State; +import org.apache.hadoop.fs.tosfs.util.FSUtils; +import org.apache.hadoop.fs.tosfs.util.TempFiles; +import org.apache.hadoop.fs.tosfs.util.TestUtility; +import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TestObjectOutputStream extends ObjectStorageTestBase { + + private static ExecutorService threadPool; + + @BeforeClass + public static void beforeClass() { + threadPool = ThreadPools.newWorkerPool("TestObjectOutputStream-pool"); + } + + @AfterClass + public static void afterClass() { + if (!threadPool.isShutdown()) { + threadPool.shutdown(); + } + } + + @Test + public void testMkStagingDir() throws ExecutionException, InterruptedException, IOException { + try (TempFiles tmp = TempFiles.of()) { + List<String> tmpDirs = Lists.newArrayList(); + for (int i = 0; i < 3; i++) { + tmpDirs.add(tmp.newDir()); + } + Configuration newConf = new Configuration(protonConf); + newConf.set(ConfKeys.MULTIPART_STAGING_DIR.format("filestore"), Joiner.on(",").join(tmpDirs)); + + // Start multiple threads to open streams to create staging dir. + List<Future<ObjectOutputStream>> futures = Collections.synchronizedList(new ArrayList<>()); + for (int i = 0; i < 10; i++) { + futures.add(threadPool.submit(() -> + new ObjectOutputStream(storage, threadPool, newConf, path("none.txt"), true))); + } + for (Future<ObjectOutputStream> f : futures) { + f.get().close(); + } + } + } + + @Test + public void testWriteZeroByte() throws IOException { + Path zeroByteTxt = path("zero-byte.txt"); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, zeroByteTxt, true); + // write zero-byte and close. + out.write(new byte[0], 0, 0); + out.close(); + assertStagingPart(0, out.stagingParts()); + + // Read and validate the dest object contents + ObjectTestUtils.assertObject(zeroByteTxt, ObjectTestUtils.EMPTY_BYTES); + } + + @Test + public void testWriteZeroByteWithoutAllowPut() throws IOException { + Path zeroByteTxt = path("zero-byte-without-allow-put.txt"); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, zeroByteTxt, false); + // write zero-byte and close. + out.close(); + assertStagingPart(0, out.stagingParts()); + + // Read and validate the dest object content. + ObjectTestUtils.assertObject(zeroByteTxt, ObjectTestUtils.EMPTY_BYTES); + } + + @Test + public void testDeleteStagingFileWhenUploadPartsOK() throws IOException { + Path path = path("data.txt"); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, path, true); + byte[] data = TestUtility.rand((int) (ConfKeys.MULTIPART_SIZE_DEFAULT * 2)); + out.write(data); + out.waitForPartsUpload(); + for (StagingPart part : out.stagingParts()) { + Assert.assertEquals(State.CLEANED, part.state()); + } + out.close(); + for (StagingPart part : out.stagingParts()) { + Assert.assertEquals(State.CLEANED, part.state()); + } + } + + @Test + public void testDeleteStagingFileWithClose() throws IOException { + Path path = path("data.txt"); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, path, true); + byte[] data = TestUtility.rand((int) (ConfKeys.MULTIPART_SIZE_DEFAULT * 2)); + out.write(data); + out.close(); + for (StagingPart part : out.stagingParts()) { + Assert.assertEquals(State.CLEANED, part.state()); + } + } + + @Test + public void testDeleteSimplePutStagingFile() throws IOException { + Path smallTxt = path("small.txt"); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, smallTxt, true); + byte[] data = TestUtility.rand(4 << 20); + out.write(data); + for (StagingPart part : out.stagingParts()) { + Assert.assertTrue(part.size() > 0); + } + out.close(); + for (StagingPart part : out.stagingParts()) { + Assert.assertEquals(State.CLEANED, part.state()); + } + } + + @Test + public void testSimplePut() throws IOException { + Path smallTxt = path("small.txt"); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, smallTxt, true); + byte[] data = TestUtility.rand(4 << 20); + out.write(data); + out.close(); + assertStagingPart(1, out.stagingParts()); + assertNull("Should use the simple PUT to upload object for small file.", out.upload()); + + // Read and validate the dest object content. + ObjectTestUtils.assertObject(smallTxt, data); + } + + public void testWrite(int uploadPartSize, int len) throws IOException { + Configuration newConf = new Configuration(protonConf); + newConf.setLong(ConfKeys.MULTIPART_SIZE.format(FSUtils.scheme(conf, testDir.toUri())), + uploadPartSize); + + Path outPath = path(len + ".txt"); + int partNum = (len - 1) / uploadPartSize + 1; + + byte[] data = TestUtility.rand(len); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, newConf, outPath, true); + try { + out.write(data); + } finally { + out.close(); + } + + assertStagingPart(partNum, out.stagingParts()); + ObjectTestUtils.assertObject(outPath, data); + + // List multipart uploads + int uploadsNum = 0; + for (MultipartUpload ignored : storage.listUploads(out.destKey())) { + uploadsNum += 1; + } + Assert.assertEquals(0L, uploadsNum); + } + + @Test + public void testParallelWriteOneOutPutStream() throws IOException, ExecutionException, InterruptedException { + testParallelWriteOneOutPutStreamImpl(5 << 20, 10, 128); + testParallelWriteOneOutPutStreamImpl(5 << 20, 10, 1 << 20); + testParallelWriteOneOutPutStreamImpl(5 << 20, 10, 2 << 20); + testParallelWriteOneOutPutStreamImpl(5 << 20, 10, 6 << 20); + } + + public void testParallelWriteOneOutPutStreamImpl(int partSize, int epochs, int batchSize) + throws IOException, ExecutionException, InterruptedException { + Configuration newConf = new Configuration(protonConf); + newConf.setLong(ConfKeys.MULTIPART_SIZE.format(FSUtils.scheme(conf, testDir.toUri())), + partSize); + + String file = String.format("%d-%d-%d-testParallelWriteOneOutPutStream.txt", partSize, epochs, batchSize); + Path outPath = path(file); + try (ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, newConf, outPath, true)) { + List<Future<?>> futures = new ArrayList<>(); + for (int i = 0; i < epochs; i++) { + final int index = i; + futures.add(threadPool.submit(() -> { + try { + out.write(dataset(batchSize, index)); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + + // wait for all tasks finished + for (Future<?> future : futures) { + future.get(); + } + } + + try (InputStream inputStream = storage.get(ObjectUtils.pathToKey(outPath)).stream()) { + List<byte[]> ret = new ArrayList<>(); + byte[] data = new byte[batchSize]; + while (inputStream.read(data) != -1) { + ret.add(data); + data = new byte[batchSize]; + } + + assertEquals(epochs, ret.size()); + List<byte[]> sortedRet = ret.stream() + .sorted(Comparator.comparingInt(o -> o[0])) + .collect(Collectors.toList()); + + int j = 0; + for (byte[] e : sortedRet) { + assertArrayEquals(dataset(batchSize, j), e); + j++; + } + } + } + + public static byte[] dataset(int len, int base) { + byte[] dataset = new byte[len]; + for (int i = 0; i < len; i++) { + dataset[i] = (byte) (base); + } + return dataset; + } + + @Test + public void testWrite1MB() throws IOException { + testWrite(5 << 20, 1 << 20); + testWrite(8 << 20, 1 << 20); + testWrite(16 << 20, 1 << 20); + } + + @Test + public void testWrite24MB() throws IOException { + testWrite(5 << 20, 24 << 20); + testWrite(8 << 20, 24 << 20); + testWrite(16 << 20, 24 << 20); + } + + @Test + public void testWrite100MB() throws IOException { + testWrite(5 << 20, 100 << 20); + testWrite(8 << 20, 100 << 20); + testWrite(16 << 20, 100 << 20); + } + + private void testMultipartThreshold(int partSize, int multipartThreshold, int dataSize) throws IOException { + Configuration newConf = new Configuration(protonConf); + newConf.setLong(ConfKeys.MULTIPART_SIZE.format(scheme), partSize); + newConf.setLong(ConfKeys.MULTIPART_THRESHOLD.format(scheme), multipartThreshold); + Path outPath = path(String.format("threshold-%d-%d-%d.txt", partSize, multipartThreshold, dataSize)); + + byte[] data = TestUtility.rand(dataSize); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, newConf, outPath, true); + try { + // Verify for every 1MB data writing, unless reaching the threshold. + int upperLimit = Math.min(multipartThreshold, dataSize); + int curOff = 0; + for (; curOff < upperLimit; curOff += (1 << 20)) { + int end = Math.min(curOff + (1 << 20), upperLimit); + out.write(Arrays.copyOfRange(data, curOff, end)); + + List<MultipartUpload> uploads = Lists.newArrayList(storage.listUploads(out.destKey())); + if (end < multipartThreshold) { + Assert.assertEquals("Shouldn't has any uploads because it just use simple PUT", 0, uploads.size()); + } else { + Assert.assertEquals("Switch to use MPU.", 1, uploads.size()); + } + assertEquals((end - 1) / partSize + 1, out.stagingParts().size()); + } + + // Verify for every 1MB data writing, unless reaching the data size. + for (; curOff < dataSize; curOff += (1 << 20)) { + int end = Math.min(curOff + (1 << 20), dataSize); + out.write(Arrays.copyOfRange(data, curOff, end)); + + List<MultipartUpload> uploads = Lists.newArrayList(storage.listUploads(out.destKey())); + Assert.assertEquals(1, uploads.size()); + assertEquals(out.destKey(), uploads.get(0).key()); + assertEquals((end - 1) / partSize + 1, out.stagingParts().size()); + } + } finally { + out.close(); + } + + assertStagingPart((dataSize - 1) / partSize + 1, out.stagingParts()); + ObjectTestUtils.assertObject(outPath, data); + + List<MultipartUpload> uploads = Lists.newArrayList(storage.listUploads(out.destKey())); + Assert.assertEquals(0, uploads.size()); + } + + @Test + public void testMultipartThreshold2MB() throws IOException { + testMultipartThreshold(5 << 20, 2 << 20, 1 << 20); + testMultipartThreshold(5 << 20, 2 << 20, (2 << 20) - 1); + testMultipartThreshold(5 << 20, 2 << 20, 2 << 20); + testMultipartThreshold(5 << 20, 2 << 20, 4 << 20); + testMultipartThreshold(5 << 20, 2 << 20, 5 << 20); + testMultipartThreshold(5 << 20, 2 << 20, (5 << 20) + 1); + testMultipartThreshold(5 << 20, 2 << 20, 6 << 20); + testMultipartThreshold(5 << 20, 2 << 20, 10 << 20); + testMultipartThreshold(5 << 20, 2 << 20, 20 << 20); + } + + @Test + public void testMultipartThreshold5MB() throws IOException { + testMultipartThreshold(5 << 20, 5 << 20, 1 << 20); + testMultipartThreshold(5 << 20, 5 << 20, 4 << 20); + testMultipartThreshold(5 << 20, 5 << 20, 5 << 20); + testMultipartThreshold(5 << 20, 5 << 20, 5 << 20); + testMultipartThreshold(5 << 20, 5 << 20, 6 << 20); + testMultipartThreshold(5 << 20, 5 << 20, 10 << 20); + testMultipartThreshold(5 << 20, 5 << 20, 20 << 20); + } + + @Test + public void testMultipartThreshold10MB() throws IOException { + testMultipartThreshold(5 << 20, 10 << 20, 1 << 20); + testMultipartThreshold(5 << 20, 10 << 20, 10 << 20); + testMultipartThreshold(5 << 20, 10 << 20, 11 << 20); + testMultipartThreshold(5 << 20, 10 << 20, 15 << 20); + testMultipartThreshold(5 << 20, 10 << 20, 20 << 20); + testMultipartThreshold(5 << 20, 10 << 20, 40 << 20); + testMultipartThreshold(5 << 20, 10 << 20, 30 << 20); + } + + @Test + public void testCloseStreamTwice() throws IOException { + int len = 100; + Path outPath = path(len + ".txt"); + int partNum = 1; + + byte[] data = TestUtility.rand(len); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, outPath, true); + try { + out.write(data); + out.close(); + } finally { + out.close(); + } + + assertStagingPart(partNum, out.stagingParts()); + ObjectTestUtils.assertObject(outPath, data); + } + + @Test + public void testWriteClosedStream() throws IOException { + byte[] data = TestUtility.rand(10); + Path outPath = path("testWriteClosedStream.txt"); + try (ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, outPath, true)) { + out.close(); + out.write(data); + } catch (IllegalStateException e) { + assertEquals("OutputStream is closed.", e.getMessage()); + } + } + + private static void assertStagingPart(int expectedNum, List<StagingPart> parts) { + Assert.assertEquals(expectedNum, parts.size()); + for (StagingPart part : parts) { + Assert.assertTrue(part.size() > 0); + } + } + + private Path path(String name) { + return new Path(testDir, name); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TempFiles.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TempFiles.java new file mode 100644 index 00000000000..81b59a65659 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TempFiles.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.util; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; + +public class TempFiles implements Closeable { + private final List<String> files = Lists.newArrayList(); + private final List<String> dirs = Lists.newArrayList(); + + private TempFiles() { + } + + public static TempFiles of() { + return new TempFiles(); + } + + public String newFile() { + String p = newTempFile(); + files.add(p); + return p; + } + + public String newDir() { + return newDir(null); + } + + public String newDir(String prefix) { + String p = newTempDir(prefix); + dirs.add(p); + return p; + } + + @Override + public void close() { + files.forEach(file -> CommonUtils.runQuietly(() -> TempFiles.deleteFile(file))); + files.clear(); + dirs.forEach(dir -> CommonUtils.runQuietly(() -> TempFiles.deleteDir(dir))); + dirs.clear(); + } + + public static String newTempFile() { + return String.join(File.pathSeparator, newTempDir(), UUIDUtils.random()); + } + + public static String newTempDir() { + return newTempDir(null); + } + + public static String newTempDir(String prefix) { + try { + return Files.createTempDirectory(prefix).toFile().getAbsolutePath(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static void deleteFile(String path) { + try { + Files.deleteIfExists(Paths.get(path)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static void deleteDir(String path) { + try { + FileUtils.deleteDirectory(new File(path)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TestFSUtils.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TestFSUtils.java new file mode 100644 index 00000000000..9dca0cf571e --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TestFSUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.util; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import java.net.URI; +import java.net.URISyntaxException; + +import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TestFSUtils { + @Test + public void testNormalizeURI() throws URISyntaxException { + URI uri = new URI("tos://abc/dir/key"); + URI normalizeURI = FSUtils.normalizeURI(uri, new Configuration()); + assertEquals("tos", normalizeURI.getScheme()); + assertEquals("abc", normalizeURI.getAuthority()); + assertEquals("abc", normalizeURI.getHost()); + assertEquals("/dir/key", normalizeURI.getPath()); + + uri = new URI("/abc/dir/key"); + normalizeURI = FSUtils.normalizeURI(uri, new Configuration()); + assertNull(uri.getScheme()); + assertEquals("file", normalizeURI.getScheme()); + assertNull(uri.getAuthority()); + assertNull(normalizeURI.getAuthority()); + assertEquals("/abc/dir/key", uri.getPath()); + assertEquals("/", normalizeURI.getPath()); + + uri = new URI("tos:///abc/dir/key"); + normalizeURI = FSUtils.normalizeURI(uri, new Configuration()); + assertEquals("tos", uri.getScheme()); + assertNull(uri.getAuthority()); + assertEquals("/abc/dir/key", uri.getPath()); + assertEquals("tos", normalizeURI.getScheme()); + assertNull(normalizeURI.getAuthority()); + assertEquals("/abc/dir/key", normalizeURI.getPath()); + + Configuration conf = new Configuration(); + conf.set(FS_DEFAULT_NAME_KEY, "tos://bucket/"); + normalizeURI = FSUtils.normalizeURI(uri, conf); + assertEquals("tos", normalizeURI.getScheme()); + assertEquals("bucket", normalizeURI.getAuthority()); + assertEquals("/", normalizeURI.getPath()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org