This is an automated email from the ASF dual-hosted git repository. jinglun pushed a commit to branch HADOOP-19236-original in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit c7cd0ba4d699c003ed15c9777e629b6f87723107 Author: lijinglun <lijing...@bytedance.com> AuthorDate: Tue Oct 29 21:39:42 2024 +0800 Integration of TOS: Add Committer test. --- .../org/apache/hadoop/fs/tosfs/RawFileSystem.java | 2 +- .../apache/hadoop/fs/tosfs/commit/CommitUtils.java | 2 +- .../hadoop/fs/tosfs/commit/CommitterFactory.java | 33 ++ .../hadoop/fs/tosfs/commit/mapred/Committer.java | 4 +- .../apache/hadoop/fs/tosfs/common/ThreadPools.java | 4 +- .../org/apache/hadoop/fs/tosfs/conf/TosKeys.java | 19 +- .../org/apache/hadoop/fs/tosfs/object/tos/TOS.java | 4 +- .../apache/hadoop/fs/tosfs/TestTosChecksum.java | 2 +- .../hadoop/fs/tosfs/commit/BaseJobSuite.java | 227 +++++++++++ .../hadoop/fs/tosfs/commit/CommitterTestBase.java | 422 +++++++++++++++++++++ .../apache/hadoop/fs/tosfs/commit/JobSuite.java | 219 +++++++++++ .../hadoop/fs/tosfs/commit/MRJobTestBase.java | 232 +++++++++++ .../hadoop/fs/tosfs/commit/TestCommitter.java | 29 ++ .../apache/hadoop/fs/tosfs/commit/TestMRJob.java | 49 +++ .../fs/tosfs/commit/TestMagicOutputStream.java | 2 +- .../fs/tosfs/commit/mapred/CommitterTestBase.java | 364 ++++++++++++++++++ .../hadoop/fs/tosfs/commit/mapred/JobSuite.java | 228 +++++++++++ .../fs/tosfs/commit/mapred/TestCommitter.java | 29 ++ .../fs/tosfs/object/ObjectStorageTestBase.java | 6 +- .../fs/tosfs/object/TestObjectOutputStream.java | 24 +- 20 files changed, 1866 insertions(+), 35 deletions(-) diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java index 8675ca7b774..b89a1477ee6 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java @@ -178,7 +178,7 @@ public class RawFileSystem extends FileSystem { if (fileStatus == null && FuseUtils.fuseEnabled()) { // The fuse requires the file to be visible when accessing getFileStatus once we created the file, so here we // close and commit the file to be visible explicitly for fuse, and then reopen the file output stream for - // further data bytes writing. For more details please see: https://code.byted.org/emr/proton/issues/825 + // further data bytes writing. out.close(); out = new ObjectOutputStream(storage, uploadThreadPool, getConf(), makeQualified(path), true); } diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java index 2ea0277f544..845116344db 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java @@ -329,7 +329,7 @@ public class CommitUtils { void visit(FileStatus f); } - public static boolean supportProtonCommit(Configuration conf, Path outputPath) { + public static boolean supportObjectStorageCommit(Configuration conf, Path outputPath) { return supportSchemes(conf).contains(outputPath.toUri().getScheme()); } diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitterFactory.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitterFactory.java new file mode 100644 index 00000000000..d85b1b5c9af --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitterFactory.java @@ -0,0 +1,33 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory; + +import java.io.IOException; + +public class CommitterFactory extends PathOutputCommitterFactory { + + @Override + public PathOutputCommitter createOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + return new Committer(outputPath, context); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java index 9a2603942fd..614ee70ac34 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java @@ -47,7 +47,7 @@ public class Committer extends FileOutputCommitter { private org.apache.hadoop.mapreduce.OutputCommitter getWrapped(JobContext context) throws IOException { if(wrapped == null) { - wrapped = CommitUtils.supportProtonCommit(context.getConfiguration(), getOutputPath(context)) + wrapped = CommitUtils.supportObjectStorageCommit(context.getConfiguration(), getOutputPath(context)) ? new org.apache.hadoop.fs.tosfs.commit.Committer(getOutputPath(context), context) : new org.apache.hadoop.mapred.FileOutputCommitter(); LOG.debug("Using OutputCommitter implementation {}", wrapped.getClass().getName()); @@ -64,7 +64,7 @@ public class Committer extends FileOutputCommitter { private org.apache.hadoop.mapreduce.OutputCommitter getWrapped(TaskAttemptContext context) throws IOException { if(wrapped == null) { - wrapped = CommitUtils.supportProtonCommit(context.getConfiguration(), getOutputPath(context)) + wrapped = CommitUtils.supportObjectStorageCommit(context.getConfiguration(), getOutputPath(context)) ? new org.apache.hadoop.fs.tosfs.commit.Committer(getOutputPath(context), context) : new org.apache.hadoop.mapred.FileOutputCommitter(); } diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java index b69dd9c6d7f..df3bc81cd8e 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java @@ -41,12 +41,12 @@ public class ThreadPools { private ThreadPools() { } - public static final String WORKER_THREAD_POOL_SIZE_PROP = "proton.worker.num-threads"; + public static final String WORKER_THREAD_POOL_SIZE_PROP = "tos.worker.num-threads"; public static final int WORKER_THREAD_POOL_SIZE = poolSize(Math.max(2, Runtime.getRuntime().availableProcessors())); - private static final ExecutorService WORKER_POOL = newWorkerPool("proton-default-worker-pool"); + private static final ExecutorService WORKER_POOL = newWorkerPool("tos-default-worker-pool"); public static ExecutorService defaultWorkerPool() { return WORKER_POOL; diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/TosKeys.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/TosKeys.java index 8c16306811e..7697deee00d 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/TosKeys.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/TosKeys.java @@ -109,15 +109,15 @@ public class TosKeys { public static final int FS_TOS_HTTP_CONNECT_TIMEOUT_MILLS_DEFAULT = 10000; /** - * The reading timeout when reading data from tos. Note that it is configured for the tos client, - * not proton. + * The reading timeout when reading data from tos. Note that it is configured for the tos client + * sdk, not hadoop-tos. */ public static final String FS_TOS_HTTP_READ_TIMEOUT_MILLS = "fs.tos.http.readTimeoutMills"; public static final int FS_TOS_HTTP_READ_TIMEOUT_MILLS_DEFAULT = 30000; /** - * The writing timeout when uploading data to tos. Note that it is configured for the tos client, - * not proton. + * The writing timeout when uploading data to tos. Note that it is configured for the tos client + * sdk, not hadoop-tos. */ public static final String FS_TOS_HTTP_WRITE_TIMEOUT_MILLS = "fs.tos.http.writeTimeoutMills"; public static final int FS_TOS_HTTP_WRITE_TIMEOUT_MILLS_DEFAULT = 30000; @@ -152,11 +152,10 @@ public class TosKeys { /** * The prefix will be used as the product name in TOS SDK. The final user agent pattern is - * '{prefix}/Proton/{proton version}'. - * TODO: review it. + * '{prefix}/TOS_FS/{hadoop tos version}'. */ public static final String FS_TOS_USER_AGENT_PREFIX = "fs.tos.user.agent.prefix"; - public static final String FS_TOS_USER_AGENT_PREFIX_DEFAULT = "EMR"; + public static final String FS_TOS_USER_AGENT_PREFIX_DEFAULT = "HADOOP-TOS"; // TOS common keys. /** @@ -187,7 +186,7 @@ public class TosKeys { public static final int FS_TOS_BATCH_DELETE_MAX_RETRIES_DEFAULT = 20; /** - * The codes from TOS deleteMultiObjects response, Proton will resend the batch delete request to + * The codes from TOS deleteMultiObjects response, client will resend the batch delete request to * delete the failed keys again if the response only contains these codes, otherwise won't send * request anymore. */ @@ -213,7 +212,7 @@ public class TosKeys { public static final int FS_TOS_LIST_OBJECTS_COUNT_DEFAULT = 1000; /** - * The maximum retry times of sending request via TOS client, Proton will resend the request if + * The maximum retry times of sending request via TOS client, client will resend the request if * got retryable exceptions, e.g. SocketException, UnknownHostException, SSLException, * InterruptedException, SocketTimeoutException, or got TOO_MANY_REQUESTS, INTERNAL_SERVER_ERROR * http codes. @@ -232,7 +231,7 @@ public class TosKeys { TOSErrorCodes.FAST_FAILURE_CONFLICT_ERROR_CODES; /** - * The maximum retry times of reading object content via TOS client, Proton will resend the + * The maximum retry times of reading object content via TOS client, client will resend the * request to create a new input stream if getting unexpected end of stream error during reading * the input stream. */ diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java index a512e708584..06a52f0f7c7 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java @@ -154,10 +154,10 @@ public class TOS implements DirectoryStorage { private BucketInfo bucketInfo; static { - org.apache.log4j.Logger logger = LogManager.getLogger("io.proton.shaded.com.volcengine.tos"); + org.apache.log4j.Logger logger = LogManager.getLogger("com.volcengine.tos"); String logLevel = System.getProperty("tos.log.level", "WARN"); - LOG.debug("Reset the log level of io.proton.shaded.com.volcengine.tos with {} ", logLevel); + LOG.debug("Reset the log level of com.volcengine.tos with {} ", logLevel); logger.setLevel(Level.toLevel(logLevel.toUpperCase(), Level.WARN)); } diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/TestTosChecksum.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/TestTosChecksum.java index 1192ca5d7ab..938da20e4cb 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/TestTosChecksum.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/TestTosChecksum.java @@ -46,7 +46,7 @@ import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) public class TestTosChecksum { - private static final String FILE_STORE_ROOT = TempFiles.newTempDir("TestProtonChecksum"); + private static final String FILE_STORE_ROOT = TempFiles.newTempDir("TestTosChecksum"); private static final String ALGORITHM_NAME = "mock-algorithm"; private static final String PREFIX = UUIDUtils.random(); diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/BaseJobSuite.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/BaseJobSuite.java new file mode 100644 index 00000000000..06c561b4cde --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/BaseJobSuite.java @@ -0,0 +1,227 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.object.MultipartUpload; +import org.apache.hadoop.fs.tosfs.object.ObjectInfo; +import org.apache.hadoop.fs.tosfs.object.ObjectStorage; +import org.apache.hadoop.fs.tosfs.object.ObjectUtils; +import org.apache.hadoop.fs.tosfs.util.ParseUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public abstract class BaseJobSuite { + private static final Logger LOG = LoggerFactory.getLogger(BaseJobSuite.class); + public static final int DEFAULT_APP_ATTEMPT_ID = 1; + protected static final Text KEY_1 = new Text("key1"); + protected static final Text KEY_2 = new Text("key2"); + protected static final Text VAL_1 = new Text("val1"); + protected static final Text VAL_2 = new Text("val2"); + + protected Job job; + protected String jobId; + protected FileSystem fs; + protected Path outputPath; + protected ObjectStorage storage; + + private final boolean dumpObjectStorage = ParseUtils.envAsBoolean("DUMP_OBJECT_STORAGE", false); + + protected abstract Path magicPartPath(); + + protected abstract Path magicPendingSetPath(); + + protected abstract void assertSuccessMarker() throws IOException; + + protected abstract void assertSummaryReport(Path reportDir) throws IOException; + + protected abstract void assertNoTaskAttemptPath() throws IOException; + + protected void assertMagicPathExist(Path outputPath) throws IOException { + Path magicPath = CommitUtils.magicPath(outputPath); + Assert.assertTrue(String.format("Magic path: %s should exist", magicPath), fs.exists(magicPath)); + } + + protected void assertMagicPathNotExist(Path outputPath) throws IOException { + Path magicPath = CommitUtils.magicPath(outputPath); + Assert.assertFalse(String.format("Magic path: %s should not exist", magicPath), fs.exists(magicPath)); + } + + protected abstract boolean skipTests(); + + public Path magicPendingPath() { + Path magicPart = magicPartPath(); + return new Path(magicPart.getParent(), magicPart.getName() + ".pending"); + } + + public Path magicJobPath() { + return CommitUtils.magicPath(outputPath); + } + + public String magicPartKey() { + return ObjectUtils.pathToKey(magicPartPath()); + } + + public String destPartKey() { + return MagicOutputStream.toDestKey(magicPartPath()); + } + + public FileSystem fs() { + return fs; + } + + public ObjectStorage storage() { + return storage; + } + + public Job job() { + return job; + } + + public void assertHasMagicKeys() { + Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), ""); + Assert.assertTrue("Should have some __magic object keys", Iterables.any(objects, o -> o.key().contains( + CommitUtils.MAGIC) && o.key().contains(jobId))); + } + + public void assertHasBaseKeys() { + Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), ""); + Assert.assertTrue("Should have some __base object keys", Iterables.any(objects, o -> o.key().contains( + CommitUtils.BASE) && o.key().contains(jobId))); + } + + public void assertNoMagicPendingFile() { + String magicPendingKey = String.format("%s.pending", magicPartKey()); + Assert.assertNull("Magic pending key should exist", storage.head(magicPendingKey)); + } + + public void assertHasMagicPendingFile() { + String magicPendingKey = String.format("%s.pending", magicPartKey()); + Assert.assertNotNull("Magic pending key should exist", storage.head(magicPendingKey)); + } + + public void assertNoMagicMultipartUpload() { + Iterable<MultipartUpload> uploads = storage.listUploads(ObjectUtils.pathToKey(magicJobPath(), true)); + boolean anyMagicUploads = Iterables.any(uploads, u -> u.key().contains(CommitUtils.MAGIC)); + Assert.assertFalse("Should have no magic multipart uploads", anyMagicUploads); + } + + public void assertNoMagicObjectKeys() { + Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), ""); + boolean anyMagicUploads = + Iterables.any(objects, o -> o.key().contains(CommitUtils.MAGIC) && o.key().contains(jobId)); + Assert.assertFalse("Should not have any magic keys", anyMagicUploads); + } + + public void assertHasPendingSet() { + Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), ""); + boolean anyPendingSet = + Iterables.any(objects, o -> o.key().contains(CommitUtils.PENDINGSET_SUFFIX) && o.key().contains(jobId)); + Assert.assertTrue("Should have the expected .pendingset file", anyPendingSet); + } + + public void assertPendingSetAtRightLocation() { + Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), ""); + Path magicJobAttemptPath = + CommitUtils.magicJobAttemptPath(job().getJobID().toString(), DEFAULT_APP_ATTEMPT_ID, outputPath); + String inQualifiedPath = magicJobAttemptPath.toUri().getPath().substring(1); + Iterable<ObjectInfo> filtered = + Iterables.filter(objects, o -> o.key().contains(CommitUtils.PENDINGSET_SUFFIX) && o.key().contains(jobId)); + boolean pendingSetAtRightLocation = + Iterables.any(filtered, o -> o.key().startsWith(inQualifiedPath) && o.key().contains(jobId)); + Assert.assertTrue("The .pendingset file should locate at the job's magic output path.", pendingSetAtRightLocation); + } + + public void assertMultipartUpload(int expectedUploads) { + // Note: should be care in concurrent case: they need to check the same output path. + Iterable<MultipartUpload> uploads = storage.listUploads(ObjectUtils.pathToKey(outputPath, true)); + long actualUploads = StreamSupport.stream(uploads.spliterator(), false).count(); + Assert.assertEquals(expectedUploads, actualUploads); + } + + public void assertPartFiles(int num) throws IOException { + FileStatus[] files = fs.listStatus(outputPath, + f -> !MagicOutputStream.isMagic(new Path(f.toUri())) && f.toUri().toString().contains("part-")); + Assert.assertEquals(num, files.length); + Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(outputPath, true), ""); + List<ObjectInfo> infos = Arrays.stream(Iterables.toArray(objects, ObjectInfo.class)) + .filter(o -> o.key().contains("part-")).collect(Collectors.toList()); + Assert.assertEquals( + String.format("Part files number should be %d, but got %d", num, infos.size()), num, infos.size()); + } + + public void assertNoPartFiles() throws IOException { + FileStatus[] files = fs.listStatus(outputPath, + f -> !MagicOutputStream.isMagic(new Path(f.toUri())) && f.toUri().toString().contains("part-")); + Assert.assertEquals(0, files.length); + Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(outputPath, true), ""); + boolean anyPartFile = Iterables.any(objects, o -> o.key().contains("part-")); + Assert.assertFalse("Should have no part files", anyPartFile); + } + + public void dumpObjectStorage() { + if (dumpObjectStorage) { + LOG.info("===> Dump object storage - Start <==="); + dumpObjectKeys(); + dumpMultipartUploads(); + LOG.info("===> Dump object storage - End <==="); + } + } + + public void dumpObjectKeys() { + String prefix = ObjectUtils.pathToKey(magicJobPath()); + LOG.info("Dump object keys with prefix {}", prefix); + storage.listAll("", "").forEach(o -> LOG.info("Dump object keys - {}", o)); + } + + public void dumpMultipartUploads() { + String prefix = ObjectUtils.pathToKey(magicJobPath()); + LOG.info("Dump multi part uploads with prefix {}", prefix); + storage.listUploads("") + .forEach(u -> LOG.info("Dump multipart uploads - {}", u)); + } + + public void verifyPartContent() throws IOException { + String partKey = destPartKey(); + LOG.info("Part key to verify is: {}", partKey); + try (InputStream in = storage.get(partKey).stream()) { + byte[] data = IOUtils.toByteArray(in); + String expected = String.format("%s\t%s\n%s\t%s\n", KEY_1, VAL_1, KEY_2, VAL_2); + Assert.assertEquals(expected, new String(data, StandardCharsets.UTF_8)); + } + } + + public void assertSuccessMarkerNotExist() throws IOException { + Path succPath = CommitUtils.successMarker(outputPath); + Assert.assertFalse(String.format("%s should not exists", succPath), fs.exists(succPath)); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/CommitterTestBase.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/CommitterTestBase.java new file mode 100644 index 00000000000..8ccc4d837b9 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/CommitterTestBase.java @@ -0,0 +1,422 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.object.ObjectStorage; +import org.apache.hadoop.fs.tosfs.util.CommonUtils; +import org.apache.hadoop.fs.tosfs.util.UUIDUtils; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; + +public abstract class CommitterTestBase { + private Configuration conf; + private FileSystem fs; + private Path outputPath; + private TaskAttemptID job1Task0Attempt0; + private TaskAttemptID job2Task1Attempt0; + private Path reportDir; + + @Before + public void setup() throws IOException { + conf = newConf(); + fs = FileSystem.get(conf); + String uuid = UUIDUtils.random(); + outputPath = fs.makeQualified(new Path("/test/" + uuid)); + job1Task0Attempt0 = JobSuite.createTaskAttemptId(randomTrimmedJobId(), 0, 0); + job2Task1Attempt0 = JobSuite.createTaskAttemptId(randomTrimmedJobId(), 1, 0); + + reportDir = fs.makeQualified(new Path("/report/" + uuid)); + fs.mkdirs(reportDir); + conf.set(Committer.COMMITTER_SUMMARY_REPORT_DIR, reportDir.toUri().toString()); + } + + protected abstract Configuration newConf(); + + @After + public void teardown() { + CommonUtils.runQuietly(() -> fs.delete(outputPath, true)); + IOUtils.closeStream(fs); + } + + @AfterClass + public static void afterClass() { + List<String> committerThreads = Thread.getAllStackTraces().keySet() + .stream() + .map(Thread::getName) + .filter(n -> n.startsWith(Committer.THREADS_PREFIX)) + .collect(Collectors.toList()); + Assert.assertTrue("Outstanding committer threads", committerThreads.isEmpty()); + } + + private static String randomTrimmedJobId() { + SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd"); + return String.format("%s%04d_%04d", formatter.format(new Date()), + (long) (Math.random() * 1000), + (long) (Math.random() * 1000)); + } + + private static String randomFormedJobId() { + return String.format("job_%s", randomTrimmedJobId()); + } + + @Test + public void testSetupJob() throws IOException { + JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + // Setup job. + suite.setupJob(); + suite.dumpObjectStorage(); + suite.assertHasMagicKeys(); + } + + @Test + public void testSetupJobWithOrphanPaths() throws IOException, InterruptedException { + JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + // Orphan success marker. + Path successPath = CommitUtils.successMarker(outputPath); + CommitUtils.save(fs, successPath, new byte[]{}); + Assert.assertTrue(fs.exists(successPath)); + + // Orphan job path. + Path jobPath = CommitUtils.magicJobPath(suite.committer().jobId(), outputPath); + fs.mkdirs(jobPath); + Assert.assertTrue("The job path should be existing", fs.exists(jobPath)); + Path subPath = new Path(jobPath, "tmp.pending"); + CommitUtils.save(fs, subPath, new byte[]{}); + Assert.assertTrue("The sub path under job path should be existing.", fs.exists(subPath)); + FileStatus jobPathStatus = fs.getFileStatus(jobPath); + + Thread.sleep(1000L); + suite.setupJob(); + suite.dumpObjectStorage(); + suite.assertHasMagicKeys(); + + assertFalse("Should have deleted the success path", fs.exists(successPath)); + Assert.assertTrue("Should have re-created the job path", fs.exists(jobPath)); + assertFalse("Should have deleted the sub path under the job path", fs.exists(subPath)); + } + + @Test + public void testSetupTask() throws IOException { + JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + // Remaining attempt task path. + Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(suite.taskAttemptContext(), outputPath); + Path subTaskAttemptPath = new Path(taskAttemptBasePath, "tmp.pending"); + CommitUtils.save(fs, subTaskAttemptPath, new byte[]{}); + Assert.assertTrue(fs.exists(taskAttemptBasePath)); + Assert.assertTrue(fs.exists(subTaskAttemptPath)); + + // Setup job. + suite.setupJob(); + suite.assertHasMagicKeys(); + // It will clear all the job path once we've set up the job. + assertFalse(fs.exists(taskAttemptBasePath)); + assertFalse(fs.exists(subTaskAttemptPath)); + + // Left some the task paths. + CommitUtils.save(fs, subTaskAttemptPath, new byte[]{}); + Assert.assertTrue(fs.exists(taskAttemptBasePath)); + Assert.assertTrue(fs.exists(subTaskAttemptPath)); + + // Setup task. + suite.setupTask(); + assertFalse(fs.exists(subTaskAttemptPath)); + } + + @Test + public void testCommitTask() throws Exception { + JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + + // Setup job + suite.setupJob(); + suite.dumpObjectStorage(); + suite.assertHasMagicKeys(); + + // Setup task + suite.setupTask(); + + // Write records. + suite.assertNoMagicPendingFile(); + suite.assertMultipartUpload(0); + suite.writeOutput(); + suite.dumpObjectStorage(); + suite.assertHasMagicPendingFile(); + suite.assertNoMagicMultipartUpload(); + suite.assertMultipartUpload(1); + // Assert the pending file content. + Path pendingPath = suite.magicPendingPath(); + byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath); + Pending pending = Pending.deserialize(pendingData); + assertEquals(suite.destPartKey(), pending.destKey()); + assertEquals(20, pending.length()); + assertEquals(1, pending.parts().size()); + + // Commit the task. + suite.commitTask(); + + // Verify the pending set file. + suite.assertHasPendingSet(); + // Assert the pending set file content. + Path pendingSetPath = suite.magicPendingSetPath(); + byte[] pendingSetData = CommitUtils.load(suite.fs(), pendingSetPath); + PendingSet pendingSet = PendingSet.deserialize(pendingSetData); + assertEquals(suite.job().getJobID().toString(), pendingSet.jobId()); + assertEquals(1, pendingSet.commits().size()); + assertEquals(pending, pendingSet.commits().get(0)); + assertEquals(pendingSet.extraData(), + ImmutableMap.of(CommitUtils.TASK_ATTEMPT_ID, suite.taskAttemptContext().getTaskAttemptID().toString())); + + // Complete the multipart upload and verify the results. + ObjectStorage storage = suite.storage(); + storage.completeUpload(pending.destKey(), pending.uploadId(), pending.parts()); + suite.verifyPartContent(); + } + + @Test + public void testAbortTask() throws Exception { + JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + suite.setupJob(); + suite.setupTask(); + + // Pre-check before the output write. + suite.assertNoMagicPendingFile(); + suite.assertMultipartUpload(0); + + // Execute the output write. + suite.writeOutput(); + + // Post-check after the output write. + suite.assertHasMagicPendingFile(); + suite.assertNoMagicMultipartUpload(); + suite.assertMultipartUpload(1); + // Assert the pending file content. + Path pendingPath = suite.magicPendingPath(); + byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath); + Pending pending = Pending.deserialize(pendingData); + assertEquals(suite.destPartKey(), pending.destKey()); + assertEquals(20, pending.length()); + assertEquals(1, pending.parts().size()); + + // Abort the task. + suite.abortTask(); + + // Verify the state after aborting task. + suite.assertNoMagicPendingFile(); + suite.assertNoMagicMultipartUpload(); + suite.assertMultipartUpload(0); + suite.assertNoTaskAttemptPath(); + } + + @Test + public void testCommitJob() throws Exception { + JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + suite.setupJob(); + suite.setupTask(); + suite.writeOutput(); + suite.commitTask(); + + // Commit the job. + suite.assertNoPartFiles(); + suite.commitJob(); + // Verify the output. + suite.assertNoMagicMultipartUpload(); + suite.assertNoMagicObjectKeys(); + suite.assertSuccessMarker(); + suite.assertSummaryReport(reportDir); + suite.verifyPartContent(); + } + + + @Test + public void testCommitJobFailed() throws Exception { + JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + suite.setupJob(); + suite.setupTask(); + suite.writeOutput(); + suite.commitTask(); + + // Commit the job. + suite.assertNoPartFiles(); + suite.commitJob(); + } + + @Test + public void testCommitJobSuccessMarkerFailed() throws Exception { + JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + suite.setupJob(); + suite.setupTask(); + suite.writeOutput(); + suite.commitTask(); + + CommitUtils.injectError("marker"); + // Commit the job. + suite.assertNoPartFiles(); + assertThrows("Expect commit job error.", IOException.class, suite::commitJob); + CommitUtils.removeError("marker"); + + // Verify the output. + suite.assertNoMagicMultipartUpload(); + suite.assertNoMagicObjectKeys(); + suite.assertSuccessMarkerNotExist(); + assertEquals(0, suite.fs().listStatus(suite.outputPath).length); + } + + @Test + public void testTaskCommitAfterJobCommit() throws Exception { + JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + + suite.setupJob(); + suite.setupTask(); + suite.writeOutput(); + suite.commitTask(); + + // Commit the job + suite.assertNoPartFiles(); + suite.commitJob(); + // Verify the output. + suite.assertNoMagicMultipartUpload(); + suite.assertNoMagicObjectKeys(); + suite.assertSuccessMarker(); + suite.verifyPartContent(); + + // Commit the task again. + assertThrows(FileNotFoundException.class, suite::commitTask); + } + + @Test + public void testTaskCommitWithConsistentJobId() throws Exception { + Configuration conf = newConf(); + String consistentJobId = randomFormedJobId(); + conf.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId); + JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + + // By now, we have two "jobId"s, one is spark uuid, and the other is the jobId in taskAttempt. + // The job committer will adopt the former. + suite.setupJob(); + + // Next, we clear spark uuid, and set the jobId of taskAttempt to another value. In this case, + // the committer will take the jobId of taskAttempt as the final jobId, which is not consistent + // with the one that committer holds. + conf.unset(CommitUtils.SPARK_WRITE_UUID); + String anotherJobId = randomTrimmedJobId(); + TaskAttemptID taskAttemptId1 = JobSuite.createTaskAttemptId(anotherJobId, JobSuite.DEFAULT_APP_ATTEMPT_ID); + final TaskAttemptContext attemptContext1 = + JobSuite.createTaskAttemptContext(conf, taskAttemptId1, JobSuite.DEFAULT_APP_ATTEMPT_ID); + + assertThrows("JobId set in the context", IllegalArgumentException.class, + () -> suite.setupTask(attemptContext1)); + + // Even though we use another taskAttempt, as long as we ensure the spark uuid is consistent, + // the jobId in committer is consistent. + conf.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId); + conf.set(FileOutputFormat.OUTDIR, outputPath.toString()); + anotherJobId = randomTrimmedJobId(); + TaskAttemptID taskAttemptId2 = JobSuite.createTaskAttemptId(anotherJobId, JobSuite.DEFAULT_APP_ATTEMPT_ID); + TaskAttemptContext attemptContext2 = + JobSuite.createTaskAttemptContext(conf, taskAttemptId2, JobSuite.DEFAULT_APP_ATTEMPT_ID); + + suite.setupTask(attemptContext2); + // Write output must use the same task context with setup task. + suite.writeOutput(attemptContext2); + // Commit task must use the same task context with setup task. + suite.commitTask(attemptContext2); + suite.assertPendingSetAtRightLocation(); + + // Commit the job + suite.assertNoPartFiles(); + suite.commitJob(); + + // Verify the output. + suite.assertNoMagicMultipartUpload(); + suite.assertNoMagicObjectKeys(); + suite.assertSuccessMarker(); + suite.verifyPartContent(); + } + + @Test + public void testConcurrentJobs() throws Exception { + JobSuite suite1 = JobSuite.create(conf, job1Task0Attempt0, outputPath); + JobSuite suite2 = JobSuite.create(conf, job2Task1Attempt0, outputPath); + Assume.assumeFalse(suite1.skipTests()); + Assume.assumeFalse(suite2.skipTests()); + suite1.setupJob(); + suite2.setupJob(); + suite1.setupTask(); + suite2.setupTask(); + suite1.writeOutput(); + suite2.writeOutput(); + suite1.commitTask(); + suite2.commitTask(); + + // Job2 commit the job. + suite2.assertNoPartFiles(); + suite2.commitJob(); + suite2.assertPartFiles(1); + + suite2.assertNoMagicMultipartUpload(); + suite2.assertNoMagicObjectKeys(); + suite2.assertSuccessMarker(); + suite2.assertSummaryReport(reportDir); + suite2.verifyPartContent(); + suite2.assertMagicPathExist(outputPath); + + // Job1 commit the job. + suite1.commitJob(); + suite2.assertPartFiles(2); + + // Verify the output. + suite1.assertNoMagicMultipartUpload(); + suite1.assertNoMagicObjectKeys(); + suite1.assertSuccessMarker(); + suite1.assertSummaryReport(reportDir); + suite1.verifyPartContent(); + suite1.assertMagicPathNotExist(outputPath); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/JobSuite.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/JobSuite.java new file mode 100644 index 00000000000..cba71c74802 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/JobSuite.java @@ -0,0 +1,219 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory; +import org.apache.hadoop.fs.tosfs.object.ObjectUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.net.NetUtils; +import org.junit.Assert; + +import java.io.IOException; + +public class JobSuite extends BaseJobSuite { + private static final CommitterFactory FACTORY = new CommitterFactory(); + private final JobContext jobContext; + private final TaskAttemptContext taskAttemptContext; + private final Committer committer; + + private JobSuite(FileSystem fs, Configuration conf, TaskAttemptID taskAttemptId, int appAttemptId, Path outputPath) + throws IOException { + this.fs = fs; + // Initialize the job instance. + this.job = Job.getInstance(conf); + job.setJobID(JobID.forName(CommitUtils.buildJobId(conf, taskAttemptId.getJobID()))); + this.jobContext = createJobContext(job.getConfiguration(), taskAttemptId); + this.jobId = CommitUtils.buildJobId(jobContext); + this.taskAttemptContext = createTaskAttemptContext(job.getConfiguration(), taskAttemptId, appAttemptId); + + // Set job output directory. + FileOutputFormat.setOutputPath(job, outputPath); + this.outputPath = outputPath; + this.storage = ObjectStorageFactory.create(outputPath.toUri().getScheme(), outputPath.toUri().getAuthority(), conf); + + // Initialize committer. + this.committer = (Committer) FACTORY.createOutputCommitter(outputPath, taskAttemptContext); + } + + public static JobSuite create(Configuration conf, TaskAttemptID taskAttemptId, Path outDir) throws IOException { + FileSystem fs = outDir.getFileSystem(conf); + return new JobSuite(fs, conf, taskAttemptId, DEFAULT_APP_ATTEMPT_ID, outDir); + } + + public static TaskAttemptID createTaskAttemptId(String trimmedJobId, int attemptId) { + String attempt = String.format("attempt_%s_m_000000_%d", trimmedJobId, attemptId); + return TaskAttemptID.forName(attempt); + } + + public static TaskAttemptID createTaskAttemptId(String trimmedJobId, int taskId, int attemptId) { + String[] parts = trimmedJobId.split("_"); + return new TaskAttemptID(parts[0], Integer.parseInt(parts[1]), TaskType.MAP, taskId, attemptId); + } + + public static JobContext createJobContext(Configuration jobConf, TaskAttemptID taskAttemptId) { + return new JobContextImpl(jobConf, taskAttemptId.getJobID()); + } + + public static TaskAttemptContext createTaskAttemptContext( + Configuration jobConf, TaskAttemptID taskAttemptId, int appAttemptId) throws IOException { + // Set the key values for job configuration. + jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttemptId.toString()); + jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId); + jobConf.set(PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS, CommitterFactory.class.getName()); + return new TaskAttemptContextImpl(jobConf, taskAttemptId); + } + + public void setupJob() throws IOException { + committer.setupJob(jobContext); + } + + public void setupTask() throws IOException { + committer.setupTask(taskAttemptContext); + } + + // This method simulates the scenario that the job may set up task with a different + // taskAttemptContext, e.g., for a spark job. + public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { + committer.setupTask(taskAttemptContext); + } + + public void writeOutput() throws Exception { + writeOutput(taskAttemptContext); + } + + // This method simulates the scenario that the job may set up task with a different + // taskAttemptContext, e.g., for a spark job. + public void writeOutput(TaskAttemptContext taskAttemptContext) throws Exception { + RecordWriter<Object, Object> writer = new TextOutputFormat<>().getRecordWriter(taskAttemptContext); + NullWritable nullKey = NullWritable.get(); + NullWritable nullVal = NullWritable.get(); + Object[] keys = new Object[]{KEY_1, nullKey, null, nullKey, null, KEY_2}; + Object[] vals = new Object[]{VAL_1, nullVal, null, null, nullVal, VAL_2}; + try { + Assert.assertEquals(keys.length, vals.length); + for (int i = 0; i < keys.length; i++) { + writer.write(keys[i], vals[i]); + } + } finally { + writer.close(taskAttemptContext); + } + } + + public boolean needsTaskCommit() { + return committer.needsTaskCommit(taskAttemptContext); + } + + public void commitTask() throws IOException { + committer.commitTask(taskAttemptContext); + } + + // This method simulates the scenario that the job may set up task with a different + // taskAttemptContext, e.g., for a spark job. + public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { + committer.commitTask(taskAttemptContext); + } + + public void abortTask() throws IOException { + committer.abortTask(taskAttemptContext); + } + + public void commitJob() throws IOException { + committer.commitJob(jobContext); + } + + @Override + public Path magicPartPath() { + return new Path(committer.getWorkPath(), FileOutputFormat.getUniqueFile(taskAttemptContext, "part", "")); + } + + @Override + public Path magicPendingSetPath() { + return CommitUtils.magicTaskPendingSetPath(taskAttemptContext, outputPath); + } + + public TaskAttemptContext taskAttemptContext() { + return taskAttemptContext; + } + + public Committer committer() { + return committer; + } + + @Override + public void assertNoTaskAttemptPath() throws IOException { + Path path = CommitUtils.magicTaskAttemptBasePath(taskAttemptContext, outputPath); + Assert.assertFalse("Task attempt path should be not existing", fs.exists(path)); + String pathToKey = ObjectUtils.pathToKey(path); + Assert.assertNull("Should have no task attempt path key", storage.head(pathToKey)); + } + + @Override + protected boolean skipTests() { + return storage.bucket().isDirectory(); + } + + @Override + public void assertSuccessMarker() throws IOException { + Path succPath = CommitUtils.successMarker(outputPath); + Assert.assertTrue(String.format("%s should be exists", succPath), fs.exists(succPath)); + SuccessData successData = SuccessData.deserialize(CommitUtils.load(fs, succPath)); + Assert.assertEquals(SuccessData.class.getName(), successData.name()); + Assert.assertTrue(successData.success()); + Assert.assertEquals(NetUtils.getHostname(), successData.hostname()); + Assert.assertEquals(CommitUtils.COMMITTER_NAME, successData.committer()); + Assert.assertEquals( + String.format("Task committer %s", taskAttemptContext.getTaskAttemptID()), + successData.description()); + Assert.assertEquals(job.getJobID().toString(), successData.jobId()); + Assert.assertEquals(1, successData.filenames().size()); + Assert.assertEquals(destPartKey(), successData.filenames().get(0)); + } + + @Override + public void assertSummaryReport(Path reportDir) throws IOException { + Path reportPath = CommitUtils.summaryReport(reportDir, job().getJobID().toString()); + Assert.assertTrue(String.format("%s should be exists", reportPath), fs.exists(reportPath)); + SuccessData reportData = SuccessData.deserialize(CommitUtils.load(fs, reportPath)); + Assert.assertEquals(SuccessData.class.getName(), reportData.name()); + Assert.assertTrue(reportData.success()); + Assert.assertEquals(NetUtils.getHostname(), reportData.hostname()); + Assert.assertEquals(CommitUtils.COMMITTER_NAME, reportData.committer()); + Assert.assertEquals( + String.format("Task committer %s", taskAttemptContext.getTaskAttemptID()), + reportData.description()); + Assert.assertEquals(job.getJobID().toString(), reportData.jobId()); + Assert.assertEquals(1, reportData.filenames().size()); + Assert.assertEquals(destPartKey(), reportData.filenames().get(0)); + Assert.assertEquals("clean", reportData.diagnostics().get("stage")); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/MRJobTestBase.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/MRJobTestBase.java new file mode 100644 index 00000000000..5a5d30f63de --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/MRJobTestBase.java @@ -0,0 +1,232 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.examples.terasort.TeraGen; +import org.apache.hadoop.examples.terasort.TeraSort; +import org.apache.hadoop.examples.terasort.TeraSortConfigKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.object.ObjectInfo; +import org.apache.hadoop.fs.tosfs.object.ObjectStorage; +import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory; +import org.apache.hadoop.fs.tosfs.object.ObjectUtils; +import org.apache.hadoop.fs.tosfs.util.UUIDUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.WordCount; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public abstract class MRJobTestBase { + private static final Logger LOG = LoggerFactory.getLogger(MRJobTestBase.class); + + private static Configuration conf = new Configuration(); + private static MiniMRYarnCluster yarnCluster; + + private static FileSystem fs; + + private static Path testDataPath; + + public static void setConf(Configuration newConf) { + conf = newConf; + } + + @BeforeClass + public static void beforeClass() throws IOException { + conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false); + conf.setBoolean(YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, false); + conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100); + + conf.set("mapreduce.outputcommitter.factory.scheme.tos", + CommitterFactory.class.getName()); // 3x newApiCommitter=true. + conf.set("mapred.output.committer.class", + Committer.class.getName()); // 2x and 3x newApiCommitter=false. + conf.set("mapreduce.outputcommitter.class", + org.apache.hadoop.fs.tosfs.commit.Committer.class.getName()); // 2x newApiCommitter=true. + + // Start the yarn cluster. + yarnCluster = new MiniMRYarnCluster("yarn-" + System.currentTimeMillis(), 2); + LOG.info("Default filesystem: {}", conf.get("fs.defaultFS")); + LOG.info("Default filesystem implementation: {}", conf.get("fs.AbstractFileSystem.tos.impl")); + + yarnCluster.init(conf); + yarnCluster.start(); + + fs = FileSystem.get(conf); + testDataPath = new Path("/mr-test-" + UUIDUtils.random()) + .makeQualified(fs.getUri(), fs.getWorkingDirectory()); + } + + @AfterClass + public static void afterClass() throws IOException { + fs.delete(testDataPath, true); + if (yarnCluster != null) { + yarnCluster.stop(); + } + } + + @Before + public void before() throws IOException { + } + + @After + public void after() throws IOException { + } + + @Test + public void testTeraGen() throws Exception { + Path teraGenPath = new Path(testDataPath, "teraGen").makeQualified(fs.getUri(), fs.getWorkingDirectory()); + Path output = new Path(teraGenPath, "output"); + JobConf jobConf = new JobConf(yarnCluster.getConfig()); + jobConf.addResource(conf); + jobConf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000); + jobConf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), 10); + jobConf.setBoolean(TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false); + + String[] args = new String[]{Integer.toString(1000), output.toString()}; + int result = ToolRunner.run(jobConf, new TeraGen(), args); + Assert.assertEquals(String.format("teragen %s", StringUtils.join(" ", args)), 0, result); + + // Verify the success data. + ObjectStorage storage = ObjectStorageFactory.create( + output.toUri().getScheme(), output.toUri().getAuthority(), conf); + int byteSizes = 0; + + Path success = new Path(output, CommitUtils._SUCCESS); + byte[] serializedData = CommitUtils.load(fs, success); + SuccessData successData = SuccessData.deserialize(serializedData); + Assert.assertTrue("Should execute successfully", successData.success()); + // Assert the destination paths. + Assert.assertEquals(2, successData.filenames().size()); + successData.filenames().sort(String::compareTo); + Assert.assertEquals(ObjectUtils.pathToKey(new Path(output, "part-m-00000")), + successData.filenames().get(0)); + Assert.assertEquals(ObjectUtils.pathToKey(new Path(output, "part-m-00001")), + successData.filenames().get(1)); + + for (String partFileKey : successData.filenames()) { + ObjectInfo objectInfo = storage.head(partFileKey); + Assert.assertNotNull("Output file should be existing", objectInfo); + byteSizes += objectInfo.size(); + } + + Assert.assertEquals(byteSizes, 100 /* Each row 100 bytes */ * 1000 /* total 1000 rows */); + } + + @Test + public void testTeraSort() throws Exception { + Path teraGenPath = new Path(testDataPath, "teraGen").makeQualified(fs.getUri(), fs.getWorkingDirectory()); + Path inputPath = new Path(teraGenPath, "output"); + Path outputPath = new Path(teraGenPath, "sortOutput"); + JobConf jobConf = new JobConf(yarnCluster.getConfig()); + jobConf.addResource(conf); + jobConf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000); + jobConf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), 10); + jobConf.setBoolean(TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false); + String[] args = new String[]{inputPath.toString(), outputPath.toString()}; + int result = ToolRunner.run(jobConf, new TeraSort(), args); + Assert.assertEquals(String.format("terasort %s", StringUtils.join(" ", args)), 0, result); + + // Verify the success data. + ObjectStorage storage = ObjectStorageFactory + .create(outputPath.toUri().getScheme(), outputPath.toUri().getAuthority(), conf); + int byteSizes = 0; + + Path success = new Path(outputPath, CommitUtils._SUCCESS); + byte[] serializedData = CommitUtils.load(fs, success); + SuccessData successData = SuccessData.deserialize(serializedData); + Assert.assertTrue("Should execute successfully", successData.success()); + // Assert the destination paths. + Assert.assertEquals(1, successData.filenames().size()); + successData.filenames().sort(String::compareTo); + Assert.assertEquals(ObjectUtils.pathToKey(new Path(outputPath, "part-r-00000")), successData.filenames().get(0)); + + for (String partFileKey : successData.filenames()) { + ObjectInfo objectInfo = storage.head(partFileKey); + Assert.assertNotNull("Output file should be existing", objectInfo); + byteSizes += objectInfo.size(); + } + + Assert.assertEquals(byteSizes, 100 /* Each row 100 bytes */ * 1000 /* total 1000 rows */); + } + + @Test + public void testWordCount() throws Exception { + Path wordCountPath = new Path(testDataPath, "wc").makeQualified(fs.getUri(), fs.getWorkingDirectory()); + Path output = new Path(wordCountPath, "output"); + Path input = new Path(wordCountPath, "input"); + JobConf jobConf = new JobConf(yarnCluster.getConfig()); + jobConf.addResource(conf); + jobConf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000); + jobConf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), 10); + jobConf.setBoolean(TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false); + + if (!fs.mkdirs(input)) { + throw new IOException("Mkdirs failed to create " + input.toString()); + } + + DataOutputStream file = fs.create(new Path(input, "part-0")); + file.writeBytes("a a b c"); + file.close(); + + String[] args = new String[]{input.toString(), output.toString()}; + int result = ToolRunner.run(jobConf, new WordCount(), args); + Assert.assertEquals(String.format("WordCount %s", StringUtils.join(" ", args)), 0, result); + + // Verify the success path. + Assert.assertTrue(fs.exists(new Path(output, CommitUtils._SUCCESS))); + Assert.assertTrue(fs.exists(new Path(output, "part-00000"))); + + Path success = new Path(output, CommitUtils._SUCCESS); + Assert.assertTrue("Success file must be not empty", CommitUtils.load(fs, success).length != 0); + + byte[] serializedData = CommitUtils.load(fs, new Path(output, "part-00000")); + String outputAsStr = new String(serializedData); + Map<String, Integer> resAsMap = getResultAsMap(outputAsStr); + Assert.assertEquals(2, (int) resAsMap.get("a")); + Assert.assertEquals(1, (int) resAsMap.get("b")); + Assert.assertEquals(1, (int) resAsMap.get("c")); + } + + private Map<String, Integer> getResultAsMap(String outputAsStr) { + Map<String, Integer> result = new HashMap<>(); + for (String line : outputAsStr.split("\n")) { + String[] tokens = line.split("\t"); + Assert.assertTrue(String.format("Not enough tokens in in string %s from output %s", line, outputAsStr), + tokens.length > 1); + result.put(tokens[0], Integer.parseInt(tokens[1])); + } + return result; + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestCommitter.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestCommitter.java new file mode 100644 index 00000000000..e7176047f32 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestCommitter.java @@ -0,0 +1,29 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.tosfs.util.ParseUtils; + +public class TestCommitter extends CommitterTestBase { + @Override + protected Configuration newConf() { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", String.format("tos://%s", ParseUtils.envAsString("TOS_BUCKET", false))); + return conf; + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMRJob.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMRJob.java new file mode 100644 index 00000000000..d868380802b --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMRJob.java @@ -0,0 +1,49 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.tosfs.conf.ConfKeys; +import org.apache.hadoop.fs.tosfs.conf.TosKeys; +import org.apache.hadoop.fs.tosfs.object.tos.TOS; +import org.apache.hadoop.fs.tosfs.util.ParseUtils; +import org.apache.hadoop.fs.tosfs.util.TestUtility; +import org.junit.BeforeClass; + +import java.io.IOException; + +public class TestMRJob extends MRJobTestBase { + + @BeforeClass + public static void beforeClass() throws IOException { + // Create the new configuration and set it to the IT Case. + Configuration newConf = new Configuration(); + newConf.set("fs.defaultFS", String.format("tos://%s", TestUtility.bucket())); + // Application in yarn cluster cannot read the environment variables from user bash, so here we + // set it into the config manually. + newConf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key("tos"), + ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT, false)); + newConf.set(TosKeys.FS_TOS_ACCESS_KEY_ID, + ParseUtils.envAsString(TOS.ENV_TOS_ACCESS_KEY_ID, false)); + newConf.set(TosKeys.FS_TOS_SECRET_ACCESS_KEY, + ParseUtils.envAsString(TOS.ENV_TOS_SECRET_ACCESS_KEY, false)); + + MRJobTestBase.setConf(newConf); + // Continue to prepare the IT Case environments. + MRJobTestBase.beforeClass(); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java index 0adba267667..b51505b47f8 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java @@ -186,7 +186,7 @@ public class TestMagicOutputStream extends ObjectStorageTestBase { private class TestingMagicOutputStream extends MagicOutputStream { TestingMagicOutputStream(Path magic) { - super(fs, storage, threadPool, protonConf, magic); + super(fs, storage, threadPool, tosConf, magic); } protected void persist(Path p, byte[] data) { diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/CommitterTestBase.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/CommitterTestBase.java new file mode 100644 index 00000000000..3a94e3b0181 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/CommitterTestBase.java @@ -0,0 +1,364 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.commit.CommitUtils; +import org.apache.hadoop.fs.tosfs.commit.Pending; +import org.apache.hadoop.fs.tosfs.commit.PendingSet; +import org.apache.hadoop.fs.tosfs.object.ObjectStorage; +import org.apache.hadoop.fs.tosfs.util.CommonUtils; +import org.apache.hadoop.fs.tosfs.util.UUIDUtils; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +public abstract class CommitterTestBase { + private Configuration conf; + private FileSystem fs; + private Path outputPath; + private TaskAttemptID taskAttempt0; + private Path reportDir; + + @Before + public void setup() throws IOException { + conf = newConf(); + fs = FileSystem.get(conf); + String uuid = UUIDUtils.random(); + outputPath = fs.makeQualified(new Path("/test/" + uuid)); + taskAttempt0 = JobSuite.createTaskAttemptId(randomTrimmedJobId(), 0); + + reportDir = fs.makeQualified(new Path("/report/" + uuid)); + fs.mkdirs(reportDir); + conf.set(org.apache.hadoop.fs.tosfs.commit.Committer.COMMITTER_SUMMARY_REPORT_DIR, + reportDir.toUri().toString()); + } + + protected abstract Configuration newConf(); + + @After + public void teardown() { + CommonUtils.runQuietly(() -> fs.delete(outputPath, true)); + IOUtils.closeStream(fs); + } + + @AfterClass + public static void afterClass() { + List<String> committerThreads = Thread.getAllStackTraces().keySet() + .stream() + .map(Thread::getName) + .filter(n -> n.startsWith(org.apache.hadoop.fs.tosfs.commit.Committer.THREADS_PREFIX)) + .collect(Collectors.toList()); + Assert.assertTrue("Outstanding committer threads", committerThreads.isEmpty()); + } + + private static String randomTrimmedJobId() { + SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd"); + return String.format("%s%04d_%04d", formatter.format(new Date()), + (long) (Math.random() * 1000), + (long) (Math.random() * 1000)); + } + + private static String randomFormedJobId() { + return String.format("job_%s", randomTrimmedJobId()); + } + + @Test + public void testSetupJob() throws IOException { + JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + + // Setup job. + suite.setupJob(); + suite.dumpObjectStorage(); + suite.assertHasMagicKeys(); + } + + @Test + public void testSetupJobWithOrphanPaths() throws IOException, InterruptedException { + JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + + // Orphan success marker. + Path successPath = CommitUtils.successMarker(outputPath); + CommitUtils.save(fs, successPath, new byte[]{}); + Assert.assertTrue(fs.exists(successPath)); + + // Orphan job path. + Path jobPath = CommitUtils.magicJobPath(suite.committer().jobId(), outputPath); + fs.mkdirs(jobPath); + Assert.assertTrue("The job path should be existing", fs.exists(jobPath)); + Path subPath = new Path(jobPath, "tmp.pending"); + CommitUtils.save(fs, subPath, new byte[]{}); + Assert.assertTrue("The sub path under job path should be existing.", fs.exists(subPath)); + FileStatus jobPathStatus = fs.getFileStatus(jobPath); + + Thread.sleep(1000L); + suite.setupJob(); + suite.dumpObjectStorage(); + suite.assertHasMagicKeys(); + + Assert.assertFalse("Should have deleted the success path", fs.exists(successPath)); + Assert.assertTrue("Should have re-created the job path", fs.exists(jobPath)); + Assert.assertFalse("Should have deleted the sub path under the job path", fs.exists(subPath)); + } + + @Test + public void testSetupTask() throws IOException { + JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + + // Remaining attempt task path. + Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(suite.taskAttemptContext(), outputPath); + Path subTaskAttemptPath = new Path(taskAttemptBasePath, "tmp.pending"); + CommitUtils.save(fs, subTaskAttemptPath, new byte[]{}); + Assert.assertTrue(fs.exists(taskAttemptBasePath)); + Assert.assertTrue(fs.exists(subTaskAttemptPath)); + + // Setup job. + suite.setupJob(); + suite.assertHasMagicKeys(); + // It will clear all the job path once we've set up the job. + Assert.assertFalse(fs.exists(taskAttemptBasePath)); + Assert.assertFalse(fs.exists(subTaskAttemptPath)); + + // Left some the task paths. + CommitUtils.save(fs, subTaskAttemptPath, new byte[]{}); + Assert.assertTrue(fs.exists(taskAttemptBasePath)); + Assert.assertTrue(fs.exists(subTaskAttemptPath)); + + // Setup task. + suite.setupTask(); + Assert.assertFalse(fs.exists(subTaskAttemptPath)); + } + + @Test + public void testCommitTask() throws Exception { + JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + // Setup job + suite.setupJob(); + suite.dumpObjectStorage(); + suite.assertHasMagicKeys(); + + // Setup task + suite.setupTask(); + + // Write records. + suite.assertNoMagicPendingFile(); + suite.assertMultipartUpload(0); + suite.writeOutput(); + suite.dumpObjectStorage(); + suite.assertHasMagicPendingFile(); + suite.assertNoMagicMultipartUpload(); + suite.assertMultipartUpload(1); + // Assert the pending file content. + Path pendingPath = suite.magicPendingPath(); + byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath); + Pending pending = Pending.deserialize(pendingData); + Assert.assertEquals(suite.destPartKey(), pending.destKey()); + Assert.assertEquals(20, pending.length()); + Assert.assertEquals(1, pending.parts().size()); + + // Commit the task. + suite.commitTask(); + + // Verify the pending set file. + suite.assertHasPendingSet(); + // Assert the pending set file content. + Path pendingSetPath = suite.magicPendingSetPath(); + byte[] pendingSetData = CommitUtils.load(suite.fs(), pendingSetPath); + PendingSet pendingSet = PendingSet.deserialize(pendingSetData); + Assert.assertEquals(suite.job().getJobID().toString(), pendingSet.jobId()); + Assert.assertEquals(1, pendingSet.commits().size()); + Assert.assertEquals(pending, pendingSet.commits().get(0)); + Assert.assertEquals(pendingSet.extraData(), + ImmutableMap.of(CommitUtils.TASK_ATTEMPT_ID, suite.taskAttemptContext().getTaskAttemptID().toString())); + + // Complete the multipart upload and verify the results. + ObjectStorage storage = suite.storage(); + storage.completeUpload(pending.destKey(), pending.uploadId(), pending.parts()); + suite.verifyPartContent(); + } + + @Test + public void testAbortTask() throws Exception { + JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + suite.setupJob(); + suite.setupTask(); + + // Pre-check before the output write. + suite.assertNoMagicPendingFile(); + suite.assertMultipartUpload(0); + + // Execute the output write. + suite.writeOutput(); + + // Post-check after the output write. + suite.assertHasMagicPendingFile(); + suite.assertNoMagicMultipartUpload(); + suite.assertMultipartUpload(1); + // Assert the pending file content. + Path pendingPath = suite.magicPendingPath(); + byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath); + Pending pending = Pending.deserialize(pendingData); + Assert.assertEquals(suite.destPartKey(), pending.destKey()); + Assert.assertEquals(20, pending.length()); + Assert.assertEquals(1, pending.parts().size()); + + // Abort the task. + suite.abortTask(); + + // Verify the state after aborting task. + suite.assertNoMagicPendingFile(); + suite.assertNoMagicMultipartUpload(); + suite.assertMultipartUpload(0); + suite.assertNoTaskAttemptPath(); + } + + @Test + public void testCommitJob() throws Exception { + JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + suite.setupJob(); + suite.setupTask(); + suite.writeOutput(); + suite.commitTask(); + + // Commit the job. + suite.assertNoPartFiles(); + suite.commitJob(); + // Verify the output. + suite.assertNoMagicMultipartUpload(); + suite.assertNoMagicObjectKeys(); + suite.assertSuccessMarker(); + suite.assertSummaryReport(reportDir); + suite.verifyPartContent(); + } + + + @Test + public void testCommitJobFailed() throws Exception { + JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + suite.setupJob(); + suite.setupTask(); + suite.writeOutput(); + suite.commitTask(); + + // Commit the job. + suite.assertNoPartFiles(); + suite.commitJob(); + } + + @Test + public void testTaskCommitAfterJobCommit() throws Exception { + JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + suite.setupJob(); + suite.setupTask(); + suite.writeOutput(); + suite.commitTask(); + + // Commit the job + suite.assertNoPartFiles(); + suite.commitJob(); + // Verify the output. + suite.assertNoMagicMultipartUpload(); + suite.assertNoMagicObjectKeys(); + suite.assertSuccessMarker(); + suite.verifyPartContent(); + + // Commit the task again. + Assert.assertThrows(FileNotFoundException.class, suite::commitTask); + } + + @Test + public void testTaskCommitWithConsistentJobId() throws Exception { + Configuration conf = newConf(); + String consistentJobId = randomFormedJobId(); + conf.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId); + JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath); + Assume.assumeFalse(suite.skipTests()); + + // By now, we have two "jobId"s, one is spark uuid, and the other is the jobId in taskAttempt. + // The job committer will adopt the former. + suite.setupJob(); + + // Next, we clear spark uuid, and set the jobId of taskAttempt to another value. In this case, + // the committer will take the jobId of taskAttempt as the final jobId, which is not consistent + // with the one that committer holds. + conf.unset(CommitUtils.SPARK_WRITE_UUID); + JobConf jobConf = new JobConf(conf); + String anotherJobId = randomTrimmedJobId(); + TaskAttemptID taskAttemptId1 = + JobSuite.createTaskAttemptId(anotherJobId, JobSuite.DEFAULT_APP_ATTEMPT_ID); + final TaskAttemptContext attemptContext1 = + JobSuite.createTaskAttemptContext(jobConf, taskAttemptId1, JobSuite.DEFAULT_APP_ATTEMPT_ID); + + Assert.assertThrows("JobId set in the context", IllegalArgumentException.class, + () -> suite.setupTask(attemptContext1)); + + // Even though we use another taskAttempt, as long as we ensure the spark uuid is consistent, + // the jobId in committer is consistent. + conf.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId); + conf.set(FileOutputFormat.OUTDIR, outputPath.toString()); + jobConf = new JobConf(conf); + anotherJobId = randomTrimmedJobId(); + TaskAttemptID taskAttemptId2 = + JobSuite.createTaskAttemptId(anotherJobId, JobSuite.DEFAULT_APP_ATTEMPT_ID); + TaskAttemptContext attemptContext2 = + JobSuite.createTaskAttemptContext(jobConf, taskAttemptId2, JobSuite.DEFAULT_APP_ATTEMPT_ID); + + suite.setupTask(attemptContext2); + // Write output must use the same task context with setup task. + suite.writeOutput(attemptContext2); + // Commit task must use the same task context with setup task. + suite.commitTask(attemptContext2); + suite.assertPendingSetAtRightLocation(); + + // Commit the job + suite.assertNoPartFiles(); + suite.commitJob(); + + // Verify the output. + suite.assertNoMagicMultipartUpload(); + suite.assertNoMagicObjectKeys(); + suite.assertSuccessMarker(); + suite.verifyPartContent(); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/JobSuite.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/JobSuite.java new file mode 100644 index 00000000000..034bc3460a1 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/JobSuite.java @@ -0,0 +1,228 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit.mapred; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.commit.BaseJobSuite; +import org.apache.hadoop.fs.tosfs.commit.CommitUtils; +import org.apache.hadoop.fs.tosfs.commit.SuccessData; +import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory; +import org.apache.hadoop.fs.tosfs.object.ObjectUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.net.NetUtils; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class JobSuite extends BaseJobSuite { + private static final Logger LOG = LoggerFactory.getLogger(JobSuite.class); + private final JobContext jobContext; + private final TaskAttemptContext taskAttemptContext; + private final Committer committer; + + private JobSuite(FileSystem fs, JobConf conf, + TaskAttemptID taskAttemptId, int appAttemptId, Path outputPath) + throws IOException { + this.fs = fs; + // Initialize the job instance. + this.job = Job.getInstance(conf); + job.setJobID(JobID.forName(CommitUtils.buildJobId(conf, taskAttemptId.getJobID()))); + this.jobContext = createJobContext(conf, taskAttemptId); + this.taskAttemptContext = createTaskAttemptContext(conf, taskAttemptId, appAttemptId); + this.jobId = CommitUtils.buildJobId(jobContext); + + // Set job output directory. + FileOutputFormat.setOutputPath(conf, outputPath); + this.outputPath = outputPath; + this.storage = ObjectStorageFactory.create(outputPath.toUri().getScheme(), + outputPath.toUri().getAuthority(), conf); + + // Initialize committer. + this.committer = new Committer(); + this.committer.setupTask(taskAttemptContext); + } + + public static JobSuite create(Configuration conf, TaskAttemptID taskAttemptId, Path outDir) + throws IOException { + FileSystem fs = outDir.getFileSystem(conf); + return new JobSuite(fs, new JobConf(conf), taskAttemptId, DEFAULT_APP_ATTEMPT_ID, outDir); + } + + public static TaskAttemptID createTaskAttemptId(String trimmedJobId, int attemptId) { + String attempt = String.format("attempt_%s_m_000000_%d", trimmedJobId, attemptId); + return TaskAttemptID.forName(attempt); + } + + public static JobContext createJobContext(JobConf jobConf, TaskAttemptID taskAttemptId) { + return new JobContextImpl(jobConf, taskAttemptId.getJobID()); + } + + public static TaskAttemptContext createTaskAttemptContext( + JobConf jobConf, TaskAttemptID taskAttemptId, int appAttemptId) throws IOException { + // Set the key values for job configuration. + jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttemptId.toString()); + jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId); + jobConf.set("mapred.output.committer.class", + Committer.class.getName()); // 2x and 3x newApiCommitter=false. + return new TaskAttemptContextImpl(jobConf, taskAttemptId); + } + + public void setupJob() throws IOException { + committer.setupJob(jobContext); + } + + public void setupTask() throws IOException { + committer.setupTask(taskAttemptContext); + } + + // This method simulates the scenario that the job may set up task with a different + // taskAttemptContext, e.g., for a spark job. + public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { + committer.setupTask(taskAttemptContext); + } + + public void writeOutput() throws Exception { + writeOutput(taskAttemptContext); + } + + // This method simulates the scenario that the job may set up task with a different + // taskAttemptContext, e.g., for a spark job. + public void writeOutput(TaskAttemptContext taskAttemptContext) throws Exception { + RecordWriter<Object, Object> writer = new TextOutputFormat<>().getRecordWriter(fs, + taskAttemptContext.getJobConf(), + CommitUtils.buildJobId(taskAttemptContext), + taskAttemptContext.getProgressible()); + NullWritable nullKey = NullWritable.get(); + NullWritable nullVal = NullWritable.get(); + Object[] keys = new Object[]{KEY_1, nullKey, null, nullKey, null, KEY_2}; + Object[] vals = new Object[]{VAL_1, nullVal, null, null, nullVal, VAL_2}; + try { + Assert.assertEquals(keys.length, vals.length); + for (int i = 0; i < keys.length; i++) { + writer.write(keys[i], vals[i]); + } + } finally { + writer.close(Reporter.NULL); + } + } + + public boolean needsTaskCommit() throws IOException { + return committer.needsTaskCommit(taskAttemptContext); + } + + public void commitTask() throws IOException { + committer.commitTask(taskAttemptContext); + } + + // This method simulates the scenario that the job may set up task with a different + // taskAttemptContext, e.g., for a spark job. + public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { + committer.commitTask(taskAttemptContext); + } + + public void abortTask() throws IOException { + committer.abortTask(taskAttemptContext); + } + + public void commitJob() throws IOException { + committer.commitJob(jobContext); + } + + @Override + public Path magicPartPath() { + return new Path(committer.getWorkPath(), committer.jobId()); + } + + @Override + public Path magicPendingSetPath() { + return CommitUtils.magicTaskPendingSetPath(taskAttemptContext, outputPath); + } + + public TaskAttemptContext taskAttemptContext() { + return taskAttemptContext; + } + + public Committer committer() { + return committer; + } + + @Override + public void assertNoTaskAttemptPath() throws IOException { + Path path = CommitUtils.magicTaskAttemptBasePath(taskAttemptContext, outputPath); + Assert.assertFalse("Task attempt path should be not existing", fs.exists(path)); + String pathToKey = ObjectUtils.pathToKey(path); + Assert.assertNull("Should have no task attempt path key", storage.head(pathToKey)); + } + + @Override + protected boolean skipTests() { + return storage.bucket().isDirectory(); + } + + @Override + public void assertSuccessMarker() throws IOException { + Path succPath = CommitUtils.successMarker(outputPath); + Assert.assertTrue(String.format("%s should be exists", succPath), fs.exists(succPath)); + SuccessData successData = SuccessData.deserialize(CommitUtils.load(fs, succPath)); + Assert.assertEquals(SuccessData.class.getName(), successData.name()); + Assert.assertTrue(successData.success()); + Assert.assertEquals(NetUtils.getHostname(), successData.hostname()); + Assert.assertEquals(CommitUtils.COMMITTER_NAME, successData.committer()); + Assert.assertEquals( + String.format("Task committer %s", taskAttemptContext.getTaskAttemptID()), + successData.description()); + Assert.assertEquals(job.getJobID().toString(), successData.jobId()); + Assert.assertEquals(1, successData.filenames().size()); + Assert.assertEquals(destPartKey(), successData.filenames().get(0)); + } + + @Override + public void assertSummaryReport(Path reportDir) throws IOException { + Path reportPath = CommitUtils.summaryReport(reportDir, job().getJobID().toString()); + Assert.assertTrue(String.format("%s should be exists", reportPath), fs.exists(reportPath)); + SuccessData reportData = SuccessData.deserialize(CommitUtils.load(fs, reportPath)); + Assert.assertEquals(SuccessData.class.getName(), reportData.name()); + Assert.assertTrue(reportData.success()); + Assert.assertEquals(NetUtils.getHostname(), reportData.hostname()); + Assert.assertEquals(CommitUtils.COMMITTER_NAME, reportData.committer()); + Assert.assertEquals( + String.format("Task committer %s", taskAttemptContext.getTaskAttemptID()), + reportData.description()); + Assert.assertEquals(job.getJobID().toString(), reportData.jobId()); + Assert.assertEquals(1, reportData.filenames().size()); + Assert.assertEquals(destPartKey(), reportData.filenames().get(0)); + Assert.assertEquals("clean", reportData.diagnostics().get("stage")); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/TestCommitter.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/TestCommitter.java new file mode 100644 index 00000000000..48f90ec3d30 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/TestCommitter.java @@ -0,0 +1,29 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.tosfs.util.TestUtility; + +public class TestCommitter extends CommitterTestBase { + @Override + protected Configuration newConf() { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", String.format("tos://%s", TestUtility.bucket())); + return conf; + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java index a3677a54df1..266f704bf6f 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java @@ -38,7 +38,7 @@ import java.io.IOException; public class ObjectStorageTestBase { private static final Logger LOG = LoggerFactory.getLogger(ObjectStorageTestBase.class); protected Configuration conf; - protected Configuration protonConf; + protected Configuration tosConf; protected Path testDir; protected FileSystem fs; protected String scheme; @@ -55,14 +55,14 @@ public class ObjectStorageTestBase { conf = new Configuration(); conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key("filestore"), tempDirPath); conf.set("fs.filestore.impl", LocalFileSystem.class.getName()); - protonConf = new Configuration(conf); + tosConf = new Configuration(conf); // Set the environment variable for ObjectTestUtils#assertObject TestUtility.setSystemEnv(FileStore.ENV_FILE_STORAGE_ROOT, tempDirPath); testDir = new Path("filestore://" + FileStore.DEFAULT_BUCKET + "/", UUIDUtils.random()); fs = testDir.getFileSystem(conf); scheme = testDir.toUri().getScheme(); - storage = ObjectStorageFactory.create(scheme, testDir.toUri().getAuthority(), protonConf); + storage = ObjectStorageFactory.create(scheme, testDir.toUri().getAuthority(), tosConf); } @After diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java index ac450d6d958..ff1dd89d186 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java @@ -73,7 +73,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase { for (int i = 0; i < 3; i++) { tmpDirs.add(tmp.newDir()); } - Configuration newConf = new Configuration(protonConf); + Configuration newConf = new Configuration(tosConf); newConf.set(ConfKeys.FS_MULTIPART_STAGING_DIR.key("filestore"), Joiner.on(",").join(tmpDirs)); // Start multiple threads to open streams to create staging dir. @@ -91,7 +91,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase { @Test public void testWriteZeroByte() throws IOException { Path zeroByteTxt = path("zero-byte.txt"); - ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, zeroByteTxt, true); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, zeroByteTxt, true); // write zero-byte and close. out.write(new byte[0], 0, 0); out.close(); @@ -104,7 +104,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase { @Test public void testWriteZeroByteWithoutAllowPut() throws IOException { Path zeroByteTxt = path("zero-byte-without-allow-put.txt"); - ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, zeroByteTxt, false); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, zeroByteTxt, false); // write zero-byte and close. out.close(); assertStagingPart(0, out.stagingParts()); @@ -116,7 +116,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase { @Test public void testDeleteStagingFileWhenUploadPartsOK() throws IOException { Path path = path("data.txt"); - ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, path, true); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, path, true); byte[] data = TestUtility.rand((int) (ConfKeys.FS_MULTIPART_SIZE_DEFAULT * 2)); out.write(data); out.waitForPartsUpload(); @@ -132,7 +132,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase { @Test public void testDeleteStagingFileWithClose() throws IOException { Path path = path("data.txt"); - ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, path, true); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, path, true); byte[] data = TestUtility.rand((int) (ConfKeys.FS_MULTIPART_SIZE_DEFAULT * 2)); out.write(data); out.close(); @@ -144,7 +144,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase { @Test public void testDeleteSimplePutStagingFile() throws IOException { Path smallTxt = path("small.txt"); - ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, smallTxt, true); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, smallTxt, true); byte[] data = TestUtility.rand(4 << 20); out.write(data); for (StagingPart part : out.stagingParts()) { @@ -159,7 +159,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase { @Test public void testSimplePut() throws IOException { Path smallTxt = path("small.txt"); - ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, smallTxt, true); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, smallTxt, true); byte[] data = TestUtility.rand(4 << 20); out.write(data); out.close(); @@ -171,7 +171,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase { } public void testWrite(int uploadPartSize, int len) throws IOException { - Configuration newConf = new Configuration(protonConf); + Configuration newConf = new Configuration(tosConf); newConf.setLong(ConfKeys.FS_MULTIPART_SIZE.key(FSUtils.scheme(conf, testDir.toUri())), uploadPartSize); @@ -207,7 +207,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase { public void testParallelWriteOneOutPutStreamImpl(int partSize, int epochs, int batchSize) throws IOException, ExecutionException, InterruptedException { - Configuration newConf = new Configuration(protonConf); + Configuration newConf = new Configuration(tosConf); newConf.setLong(ConfKeys.FS_MULTIPART_SIZE.key(FSUtils.scheme(conf, testDir.toUri())), partSize); @@ -283,7 +283,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase { } private void testMultipartThreshold(int partSize, int multipartThreshold, int dataSize) throws IOException { - Configuration newConf = new Configuration(protonConf); + Configuration newConf = new Configuration(tosConf); newConf.setLong(ConfKeys.FS_MULTIPART_SIZE.key(scheme), partSize); newConf.setLong(ConfKeys.FS_MULTIPART_THRESHOLD.key(scheme), multipartThreshold); Path outPath = path(String.format("threshold-%d-%d-%d.txt", partSize, multipartThreshold, dataSize)); @@ -370,7 +370,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase { int partNum = 1; byte[] data = TestUtility.rand(len); - ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, outPath, true); + ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, outPath, true); try { out.write(data); out.close(); @@ -386,7 +386,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase { public void testWriteClosedStream() throws IOException { byte[] data = TestUtility.rand(10); Path outPath = path("testWriteClosedStream.txt"); - try (ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, outPath, true)) { + try (ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, outPath, true)) { out.close(); out.write(data); } catch (IllegalStateException e) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org