This is an automated email from the ASF dual-hosted git repository.

jinglun pushed a commit to branch HADOOP-19236-original
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit c7cd0ba4d699c003ed15c9777e629b6f87723107
Author: lijinglun <lijing...@bytedance.com>
AuthorDate: Tue Oct 29 21:39:42 2024 +0800

    Integration of TOS: Add Committer test.
---
 .../org/apache/hadoop/fs/tosfs/RawFileSystem.java  |   2 +-
 .../apache/hadoop/fs/tosfs/commit/CommitUtils.java |   2 +-
 .../hadoop/fs/tosfs/commit/CommitterFactory.java   |  33 ++
 .../hadoop/fs/tosfs/commit/mapred/Committer.java   |   4 +-
 .../apache/hadoop/fs/tosfs/common/ThreadPools.java |   4 +-
 .../org/apache/hadoop/fs/tosfs/conf/TosKeys.java   |  19 +-
 .../org/apache/hadoop/fs/tosfs/object/tos/TOS.java |   4 +-
 .../apache/hadoop/fs/tosfs/TestTosChecksum.java    |   2 +-
 .../hadoop/fs/tosfs/commit/BaseJobSuite.java       | 227 +++++++++++
 .../hadoop/fs/tosfs/commit/CommitterTestBase.java  | 422 +++++++++++++++++++++
 .../apache/hadoop/fs/tosfs/commit/JobSuite.java    | 219 +++++++++++
 .../hadoop/fs/tosfs/commit/MRJobTestBase.java      | 232 +++++++++++
 .../hadoop/fs/tosfs/commit/TestCommitter.java      |  29 ++
 .../apache/hadoop/fs/tosfs/commit/TestMRJob.java   |  49 +++
 .../fs/tosfs/commit/TestMagicOutputStream.java     |   2 +-
 .../fs/tosfs/commit/mapred/CommitterTestBase.java  | 364 ++++++++++++++++++
 .../hadoop/fs/tosfs/commit/mapred/JobSuite.java    | 228 +++++++++++
 .../fs/tosfs/commit/mapred/TestCommitter.java      |  29 ++
 .../fs/tosfs/object/ObjectStorageTestBase.java     |   6 +-
 .../fs/tosfs/object/TestObjectOutputStream.java    |  24 +-
 20 files changed, 1866 insertions(+), 35 deletions(-)

diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java
index 8675ca7b774..b89a1477ee6 100644
--- 
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java
@@ -178,7 +178,7 @@ public class RawFileSystem extends FileSystem {
       if (fileStatus == null && FuseUtils.fuseEnabled()) {
         // The fuse requires the file to be visible when accessing 
getFileStatus once we created the file, so here we
         // close and commit the file to be visible explicitly for fuse, and 
then reopen the file output stream for
-        // further data bytes writing. For more details please see: 
https://code.byted.org/emr/proton/issues/825
+        // further data bytes writing.
         out.close();
         out = new ObjectOutputStream(storage, uploadThreadPool, getConf(), 
makeQualified(path), true);
       }
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java
index 2ea0277f544..845116344db 100644
--- 
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java
@@ -329,7 +329,7 @@ public class CommitUtils {
     void visit(FileStatus f);
   }
 
-  public static boolean supportProtonCommit(Configuration conf, Path 
outputPath) {
+  public static boolean supportObjectStorageCommit(Configuration conf, Path 
outputPath) {
     return supportSchemes(conf).contains(outputPath.toUri().getScheme());
   }
 
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitterFactory.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitterFactory.java
new file mode 100644
index 00000000000..d85b1b5c9af
--- /dev/null
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitterFactory.java
@@ -0,0 +1,33 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
+
+import java.io.IOException;
+
+public class CommitterFactory extends PathOutputCommitterFactory {
+
+  @Override
+  public PathOutputCommitter createOutputCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    return new Committer(outputPath, context);
+  }
+}
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java
index 9a2603942fd..614ee70ac34 100644
--- 
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java
@@ -47,7 +47,7 @@ public class Committer extends FileOutputCommitter {
 
   private org.apache.hadoop.mapreduce.OutputCommitter getWrapped(JobContext 
context) throws IOException {
     if(wrapped == null) {
-      wrapped = CommitUtils.supportProtonCommit(context.getConfiguration(), 
getOutputPath(context))
+      wrapped = 
CommitUtils.supportObjectStorageCommit(context.getConfiguration(), 
getOutputPath(context))
           ? new 
org.apache.hadoop.fs.tosfs.commit.Committer(getOutputPath(context), context)
           : new org.apache.hadoop.mapred.FileOutputCommitter();
       LOG.debug("Using OutputCommitter implementation {}", 
wrapped.getClass().getName());
@@ -64,7 +64,7 @@ public class Committer extends FileOutputCommitter {
 
   private org.apache.hadoop.mapreduce.OutputCommitter 
getWrapped(TaskAttemptContext context) throws IOException {
     if(wrapped == null) {
-      wrapped = CommitUtils.supportProtonCommit(context.getConfiguration(), 
getOutputPath(context))
+      wrapped = 
CommitUtils.supportObjectStorageCommit(context.getConfiguration(), 
getOutputPath(context))
           ? new 
org.apache.hadoop.fs.tosfs.commit.Committer(getOutputPath(context), context)
           : new org.apache.hadoop.mapred.FileOutputCommitter();
     }
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java
index b69dd9c6d7f..df3bc81cd8e 100644
--- 
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java
@@ -41,12 +41,12 @@ public class ThreadPools {
   private ThreadPools() {
   }
 
-  public static final String WORKER_THREAD_POOL_SIZE_PROP = 
"proton.worker.num-threads";
+  public static final String WORKER_THREAD_POOL_SIZE_PROP = 
"tos.worker.num-threads";
 
   public static final int WORKER_THREAD_POOL_SIZE =
       poolSize(Math.max(2, Runtime.getRuntime().availableProcessors()));
 
-  private static final ExecutorService WORKER_POOL = 
newWorkerPool("proton-default-worker-pool");
+  private static final ExecutorService WORKER_POOL = 
newWorkerPool("tos-default-worker-pool");
 
   public static ExecutorService defaultWorkerPool() {
     return WORKER_POOL;
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/TosKeys.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/TosKeys.java
index 8c16306811e..7697deee00d 100644
--- 
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/TosKeys.java
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/TosKeys.java
@@ -109,15 +109,15 @@ public class TosKeys {
   public static final int FS_TOS_HTTP_CONNECT_TIMEOUT_MILLS_DEFAULT = 10000;
 
   /**
-   * The reading timeout when reading data from tos. Note that it is 
configured for the tos client,
-   * not proton.
+   * The reading timeout when reading data from tos. Note that it is 
configured for the tos client
+   * sdk, not hadoop-tos.
    */
   public static final String FS_TOS_HTTP_READ_TIMEOUT_MILLS = 
"fs.tos.http.readTimeoutMills";
   public static final int FS_TOS_HTTP_READ_TIMEOUT_MILLS_DEFAULT = 30000;
 
   /**
-   * The writing timeout when uploading data to tos. Note that it is 
configured for the tos client,
-   * not proton.
+   * The writing timeout when uploading data to tos. Note that it is 
configured for the tos client
+   * sdk, not hadoop-tos.
    */
   public static final String FS_TOS_HTTP_WRITE_TIMEOUT_MILLS = 
"fs.tos.http.writeTimeoutMills";
   public static final int FS_TOS_HTTP_WRITE_TIMEOUT_MILLS_DEFAULT = 30000;
@@ -152,11 +152,10 @@ public class TosKeys {
 
   /**
    * The prefix will be used as the product name in TOS SDK. The final user 
agent pattern is
-   * '{prefix}/Proton/{proton version}'.
-   * TODO: review it.
+   * '{prefix}/TOS_FS/{hadoop tos version}'.
    */
   public static final String FS_TOS_USER_AGENT_PREFIX = 
"fs.tos.user.agent.prefix";
-  public static final String FS_TOS_USER_AGENT_PREFIX_DEFAULT = "EMR";
+  public static final String FS_TOS_USER_AGENT_PREFIX_DEFAULT = "HADOOP-TOS";
 
   // TOS common keys.
   /**
@@ -187,7 +186,7 @@ public class TosKeys {
   public static final int FS_TOS_BATCH_DELETE_MAX_RETRIES_DEFAULT = 20;
 
   /**
-   * The codes from TOS deleteMultiObjects response, Proton will resend the 
batch delete request to
+   * The codes from TOS deleteMultiObjects response, client will resend the 
batch delete request to
    * delete the failed keys again if the response only contains these codes, 
otherwise won't send
    * request anymore.
    */
@@ -213,7 +212,7 @@ public class TosKeys {
   public static final int FS_TOS_LIST_OBJECTS_COUNT_DEFAULT = 1000;
 
   /**
-   * The maximum retry times of sending request via TOS client, Proton will 
resend the request if
+   * The maximum retry times of sending request via TOS client, client will 
resend the request if
    * got retryable exceptions, e.g. SocketException, UnknownHostException, 
SSLException,
    * InterruptedException, SocketTimeoutException, or got TOO_MANY_REQUESTS, 
INTERNAL_SERVER_ERROR
    * http codes.
@@ -232,7 +231,7 @@ public class TosKeys {
       TOSErrorCodes.FAST_FAILURE_CONFLICT_ERROR_CODES;
 
   /**
-   * The maximum retry times of reading object content via TOS client, Proton 
will resend the
+   * The maximum retry times of reading object content via TOS client, client 
will resend the
    * request to create a new input stream if getting unexpected end of stream 
error during reading
    * the input stream.
    */
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java
index a512e708584..06a52f0f7c7 100644
--- 
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java
@@ -154,10 +154,10 @@ public class TOS implements DirectoryStorage {
   private BucketInfo bucketInfo;
 
   static {
-    org.apache.log4j.Logger logger = 
LogManager.getLogger("io.proton.shaded.com.volcengine.tos");
+    org.apache.log4j.Logger logger = 
LogManager.getLogger("com.volcengine.tos");
     String logLevel = System.getProperty("tos.log.level", "WARN");
 
-    LOG.debug("Reset the log level of io.proton.shaded.com.volcengine.tos with 
{} ", logLevel);
+    LOG.debug("Reset the log level of com.volcengine.tos with {} ", logLevel);
     logger.setLevel(Level.toLevel(logLevel.toUpperCase(), Level.WARN));
   }
 
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/TestTosChecksum.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/TestTosChecksum.java
index 1192ca5d7ab..938da20e4cb 100644
--- 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/TestTosChecksum.java
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/TestTosChecksum.java
@@ -46,7 +46,7 @@ import static org.junit.Assert.assertEquals;
 
 @RunWith(Parameterized.class)
 public class TestTosChecksum {
-  private static final String FILE_STORE_ROOT = 
TempFiles.newTempDir("TestProtonChecksum");
+  private static final String FILE_STORE_ROOT = 
TempFiles.newTempDir("TestTosChecksum");
   private static final String ALGORITHM_NAME = "mock-algorithm";
   private static final String PREFIX = UUIDUtils.random();
 
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/BaseJobSuite.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/BaseJobSuite.java
new file mode 100644
index 00000000000..06c561b4cde
--- /dev/null
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/BaseJobSuite.java
@@ -0,0 +1,227 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
+import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.fs.tosfs.util.ParseUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public abstract class BaseJobSuite {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseJobSuite.class);
+  public static final int DEFAULT_APP_ATTEMPT_ID = 1;
+  protected static final Text KEY_1 = new Text("key1");
+  protected static final Text KEY_2 = new Text("key2");
+  protected static final Text VAL_1 = new Text("val1");
+  protected static final Text VAL_2 = new Text("val2");
+
+  protected Job job;
+  protected String jobId;
+  protected FileSystem fs;
+  protected Path outputPath;
+  protected ObjectStorage storage;
+
+  private final boolean dumpObjectStorage = 
ParseUtils.envAsBoolean("DUMP_OBJECT_STORAGE", false);
+
+  protected abstract Path magicPartPath();
+
+  protected abstract Path magicPendingSetPath();
+
+  protected abstract void assertSuccessMarker() throws IOException;
+
+  protected abstract void assertSummaryReport(Path reportDir) throws 
IOException;
+
+  protected abstract void assertNoTaskAttemptPath() throws IOException;
+
+  protected void assertMagicPathExist(Path outputPath) throws IOException {
+    Path magicPath = CommitUtils.magicPath(outputPath);
+    Assert.assertTrue(String.format("Magic path: %s should exist", magicPath), 
fs.exists(magicPath));
+  }
+
+  protected void assertMagicPathNotExist(Path outputPath) throws IOException {
+    Path magicPath = CommitUtils.magicPath(outputPath);
+    Assert.assertFalse(String.format("Magic path: %s should not exist", 
magicPath), fs.exists(magicPath));
+  }
+
+  protected abstract boolean skipTests();
+
+  public Path magicPendingPath() {
+    Path magicPart = magicPartPath();
+    return new Path(magicPart.getParent(), magicPart.getName() + ".pending");
+  }
+
+  public Path magicJobPath() {
+    return CommitUtils.magicPath(outputPath);
+  }
+
+  public String magicPartKey() {
+    return ObjectUtils.pathToKey(magicPartPath());
+  }
+
+  public String destPartKey() {
+    return MagicOutputStream.toDestKey(magicPartPath());
+  }
+
+  public FileSystem fs() {
+    return fs;
+  }
+
+  public ObjectStorage storage() {
+    return storage;
+  }
+
+  public Job job() {
+    return job;
+  }
+
+  public void assertHasMagicKeys() {
+    Iterable<ObjectInfo> objects = 
storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
+    Assert.assertTrue("Should have some __magic object keys", 
Iterables.any(objects, o -> o.key().contains(
+        CommitUtils.MAGIC) && o.key().contains(jobId)));
+  }
+
+  public void assertHasBaseKeys() {
+    Iterable<ObjectInfo> objects = 
storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
+    Assert.assertTrue("Should have some __base object keys", 
Iterables.any(objects, o -> o.key().contains(
+        CommitUtils.BASE) && o.key().contains(jobId)));
+  }
+
+  public void assertNoMagicPendingFile() {
+    String magicPendingKey = String.format("%s.pending", magicPartKey());
+    Assert.assertNull("Magic pending key should exist", 
storage.head(magicPendingKey));
+  }
+
+  public void assertHasMagicPendingFile() {
+    String magicPendingKey = String.format("%s.pending", magicPartKey());
+    Assert.assertNotNull("Magic pending key should exist", 
storage.head(magicPendingKey));
+  }
+
+  public void assertNoMagicMultipartUpload() {
+    Iterable<MultipartUpload> uploads = 
storage.listUploads(ObjectUtils.pathToKey(magicJobPath(), true));
+    boolean anyMagicUploads = Iterables.any(uploads, u -> 
u.key().contains(CommitUtils.MAGIC));
+    Assert.assertFalse("Should have no magic multipart uploads", 
anyMagicUploads);
+  }
+
+  public void assertNoMagicObjectKeys() {
+    Iterable<ObjectInfo> objects = 
storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
+    boolean anyMagicUploads =
+        Iterables.any(objects, o -> o.key().contains(CommitUtils.MAGIC) && 
o.key().contains(jobId));
+    Assert.assertFalse("Should not have any magic keys", anyMagicUploads);
+  }
+
+  public void assertHasPendingSet() {
+    Iterable<ObjectInfo> objects = 
storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
+    boolean anyPendingSet =
+        Iterables.any(objects, o -> 
o.key().contains(CommitUtils.PENDINGSET_SUFFIX) && o.key().contains(jobId));
+    Assert.assertTrue("Should have the expected .pendingset file", 
anyPendingSet);
+  }
+
+  public void assertPendingSetAtRightLocation() {
+    Iterable<ObjectInfo> objects = 
storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
+    Path magicJobAttemptPath =
+        CommitUtils.magicJobAttemptPath(job().getJobID().toString(), 
DEFAULT_APP_ATTEMPT_ID, outputPath);
+    String inQualifiedPath = 
magicJobAttemptPath.toUri().getPath().substring(1);
+    Iterable<ObjectInfo> filtered =
+        Iterables.filter(objects, o -> 
o.key().contains(CommitUtils.PENDINGSET_SUFFIX) && o.key().contains(jobId));
+    boolean pendingSetAtRightLocation =
+        Iterables.any(filtered, o -> o.key().startsWith(inQualifiedPath) && 
o.key().contains(jobId));
+    Assert.assertTrue("The .pendingset file should locate at the job's magic 
output path.", pendingSetAtRightLocation);
+  }
+
+  public void assertMultipartUpload(int expectedUploads) {
+    // Note: should be care in concurrent case: they need to check the same 
output path.
+    Iterable<MultipartUpload> uploads = 
storage.listUploads(ObjectUtils.pathToKey(outputPath, true));
+    long actualUploads = StreamSupport.stream(uploads.spliterator(), 
false).count();
+    Assert.assertEquals(expectedUploads, actualUploads);
+  }
+
+  public void assertPartFiles(int num) throws IOException {
+    FileStatus[] files = fs.listStatus(outputPath,
+        f -> !MagicOutputStream.isMagic(new Path(f.toUri())) && 
f.toUri().toString().contains("part-"));
+    Assert.assertEquals(num, files.length);
+    Iterable<ObjectInfo> objects = 
storage.listAll(ObjectUtils.pathToKey(outputPath, true), "");
+    List<ObjectInfo> infos = Arrays.stream(Iterables.toArray(objects, 
ObjectInfo.class))
+        .filter(o -> o.key().contains("part-")).collect(Collectors.toList());
+    Assert.assertEquals(
+        String.format("Part files number should be %d, but got %d", num, 
infos.size()), num, infos.size());
+  }
+
+  public void assertNoPartFiles() throws IOException {
+    FileStatus[] files = fs.listStatus(outputPath,
+        f -> !MagicOutputStream.isMagic(new Path(f.toUri())) && 
f.toUri().toString().contains("part-"));
+    Assert.assertEquals(0, files.length);
+    Iterable<ObjectInfo> objects = 
storage.listAll(ObjectUtils.pathToKey(outputPath, true), "");
+    boolean anyPartFile = Iterables.any(objects, o -> 
o.key().contains("part-"));
+    Assert.assertFalse("Should have no part files", anyPartFile);
+  }
+
+  public void dumpObjectStorage() {
+    if (dumpObjectStorage) {
+      LOG.info("===> Dump object storage - Start <===");
+      dumpObjectKeys();
+      dumpMultipartUploads();
+      LOG.info("===> Dump object storage -  End  <===");
+    }
+  }
+
+  public void dumpObjectKeys() {
+    String prefix = ObjectUtils.pathToKey(magicJobPath());
+    LOG.info("Dump object keys with prefix {}", prefix);
+    storage.listAll("", "").forEach(o -> LOG.info("Dump object keys - {}", o));
+  }
+
+  public void dumpMultipartUploads() {
+    String prefix = ObjectUtils.pathToKey(magicJobPath());
+    LOG.info("Dump multi part uploads with prefix {}", prefix);
+    storage.listUploads("")
+        .forEach(u -> LOG.info("Dump multipart uploads - {}", u));
+  }
+
+  public void verifyPartContent() throws IOException {
+    String partKey = destPartKey();
+    LOG.info("Part key to verify is: {}", partKey);
+    try (InputStream in = storage.get(partKey).stream()) {
+      byte[] data = IOUtils.toByteArray(in);
+      String expected = String.format("%s\t%s\n%s\t%s\n", KEY_1, VAL_1, KEY_2, 
VAL_2);
+      Assert.assertEquals(expected, new String(data, StandardCharsets.UTF_8));
+    }
+  }
+
+  public void assertSuccessMarkerNotExist() throws IOException {
+    Path succPath = CommitUtils.successMarker(outputPath);
+    Assert.assertFalse(String.format("%s should not exists", succPath), 
fs.exists(succPath));
+  }
+}
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/CommitterTestBase.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/CommitterTestBase.java
new file mode 100644
index 00000000000..8ccc4d837b9
--- /dev/null
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/CommitterTestBase.java
@@ -0,0 +1,422 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+
+public abstract class CommitterTestBase {
+  private Configuration conf;
+  private FileSystem fs;
+  private Path outputPath;
+  private TaskAttemptID job1Task0Attempt0;
+  private TaskAttemptID job2Task1Attempt0;
+  private Path reportDir;
+
+  @Before
+  public void setup() throws IOException {
+    conf = newConf();
+    fs = FileSystem.get(conf);
+    String uuid = UUIDUtils.random();
+    outputPath = fs.makeQualified(new Path("/test/" + uuid));
+    job1Task0Attempt0 = JobSuite.createTaskAttemptId(randomTrimmedJobId(), 0, 
0);
+    job2Task1Attempt0 = JobSuite.createTaskAttemptId(randomTrimmedJobId(), 1, 
0);
+
+    reportDir = fs.makeQualified(new Path("/report/" + uuid));
+    fs.mkdirs(reportDir);
+    conf.set(Committer.COMMITTER_SUMMARY_REPORT_DIR, 
reportDir.toUri().toString());
+  }
+
+  protected abstract Configuration newConf();
+
+  @After
+  public void teardown() {
+    CommonUtils.runQuietly(() -> fs.delete(outputPath, true));
+    IOUtils.closeStream(fs);
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    List<String> committerThreads = Thread.getAllStackTraces().keySet()
+        .stream()
+        .map(Thread::getName)
+        .filter(n -> n.startsWith(Committer.THREADS_PREFIX))
+        .collect(Collectors.toList());
+    Assert.assertTrue("Outstanding committer threads", 
committerThreads.isEmpty());
+  }
+
+  private static String randomTrimmedJobId() {
+    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
+    return String.format("%s%04d_%04d", formatter.format(new Date()),
+        (long) (Math.random() * 1000),
+        (long) (Math.random() * 1000));
+  }
+
+  private static String randomFormedJobId() {
+    return String.format("job_%s", randomTrimmedJobId());
+  }
+
+  @Test
+  public void testSetupJob() throws IOException {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    // Setup job.
+    suite.setupJob();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicKeys();
+  }
+
+  @Test
+  public void testSetupJobWithOrphanPaths() throws IOException, 
InterruptedException {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    // Orphan success marker.
+    Path successPath = CommitUtils.successMarker(outputPath);
+    CommitUtils.save(fs, successPath, new byte[]{});
+    Assert.assertTrue(fs.exists(successPath));
+
+    // Orphan job path.
+    Path jobPath = CommitUtils.magicJobPath(suite.committer().jobId(), 
outputPath);
+    fs.mkdirs(jobPath);
+    Assert.assertTrue("The job path should be existing", fs.exists(jobPath));
+    Path subPath = new Path(jobPath, "tmp.pending");
+    CommitUtils.save(fs, subPath, new byte[]{});
+    Assert.assertTrue("The sub path under job path should be existing.", 
fs.exists(subPath));
+    FileStatus jobPathStatus = fs.getFileStatus(jobPath);
+
+    Thread.sleep(1000L);
+    suite.setupJob();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicKeys();
+
+    assertFalse("Should have deleted the success path", 
fs.exists(successPath));
+    Assert.assertTrue("Should have re-created the job path", 
fs.exists(jobPath));
+    assertFalse("Should have deleted the sub path under the job path", 
fs.exists(subPath));
+  }
+
+  @Test
+  public void testSetupTask() throws IOException {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    // Remaining attempt task path.
+    Path taskAttemptBasePath = 
CommitUtils.magicTaskAttemptBasePath(suite.taskAttemptContext(), outputPath);
+    Path subTaskAttemptPath = new Path(taskAttemptBasePath, "tmp.pending");
+    CommitUtils.save(fs, subTaskAttemptPath, new byte[]{});
+    Assert.assertTrue(fs.exists(taskAttemptBasePath));
+    Assert.assertTrue(fs.exists(subTaskAttemptPath));
+
+    // Setup job.
+    suite.setupJob();
+    suite.assertHasMagicKeys();
+    // It will clear all the job path once we've set up the job.
+    assertFalse(fs.exists(taskAttemptBasePath));
+    assertFalse(fs.exists(subTaskAttemptPath));
+
+    // Left some the task paths.
+    CommitUtils.save(fs, subTaskAttemptPath, new byte[]{});
+    Assert.assertTrue(fs.exists(taskAttemptBasePath));
+    Assert.assertTrue(fs.exists(subTaskAttemptPath));
+
+    // Setup task.
+    suite.setupTask();
+    assertFalse(fs.exists(subTaskAttemptPath));
+  }
+
+  @Test
+  public void testCommitTask() throws Exception {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    // Setup job
+    suite.setupJob();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicKeys();
+
+    // Setup task
+    suite.setupTask();
+
+    // Write records.
+    suite.assertNoMagicPendingFile();
+    suite.assertMultipartUpload(0);
+    suite.writeOutput();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicPendingFile();
+    suite.assertNoMagicMultipartUpload();
+    suite.assertMultipartUpload(1);
+    // Assert the pending file content.
+    Path pendingPath = suite.magicPendingPath();
+    byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath);
+    Pending pending = Pending.deserialize(pendingData);
+    assertEquals(suite.destPartKey(), pending.destKey());
+    assertEquals(20, pending.length());
+    assertEquals(1, pending.parts().size());
+
+    // Commit the task.
+    suite.commitTask();
+
+    // Verify the pending set file.
+    suite.assertHasPendingSet();
+    // Assert the pending set file content.
+    Path pendingSetPath = suite.magicPendingSetPath();
+    byte[] pendingSetData = CommitUtils.load(suite.fs(), pendingSetPath);
+    PendingSet pendingSet = PendingSet.deserialize(pendingSetData);
+    assertEquals(suite.job().getJobID().toString(), pendingSet.jobId());
+    assertEquals(1, pendingSet.commits().size());
+    assertEquals(pending, pendingSet.commits().get(0));
+    assertEquals(pendingSet.extraData(),
+        ImmutableMap.of(CommitUtils.TASK_ATTEMPT_ID, 
suite.taskAttemptContext().getTaskAttemptID().toString()));
+
+    // Complete the multipart upload and verify the results.
+    ObjectStorage storage = suite.storage();
+    storage.completeUpload(pending.destKey(), pending.uploadId(), 
pending.parts());
+    suite.verifyPartContent();
+  }
+
+  @Test
+  public void testAbortTask() throws Exception {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+
+    // Pre-check before the output write.
+    suite.assertNoMagicPendingFile();
+    suite.assertMultipartUpload(0);
+
+    // Execute the output write.
+    suite.writeOutput();
+
+    // Post-check after the output write.
+    suite.assertHasMagicPendingFile();
+    suite.assertNoMagicMultipartUpload();
+    suite.assertMultipartUpload(1);
+    // Assert the pending file content.
+    Path pendingPath = suite.magicPendingPath();
+    byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath);
+    Pending pending = Pending.deserialize(pendingData);
+    assertEquals(suite.destPartKey(), pending.destKey());
+    assertEquals(20, pending.length());
+    assertEquals(1, pending.parts().size());
+
+    // Abort the task.
+    suite.abortTask();
+
+    // Verify the state after aborting task.
+    suite.assertNoMagicPendingFile();
+    suite.assertNoMagicMultipartUpload();
+    suite.assertMultipartUpload(0);
+    suite.assertNoTaskAttemptPath();
+  }
+
+  @Test
+  public void testCommitJob() throws Exception {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    // Commit the job.
+    suite.assertNoPartFiles();
+    suite.commitJob();
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarker();
+    suite.assertSummaryReport(reportDir);
+    suite.verifyPartContent();
+  }
+
+
+  @Test
+  public void testCommitJobFailed() throws Exception {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    // Commit the job.
+    suite.assertNoPartFiles();
+    suite.commitJob();
+  }
+
+  @Test
+  public void testCommitJobSuccessMarkerFailed() throws Exception {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    CommitUtils.injectError("marker");
+    // Commit the job.
+    suite.assertNoPartFiles();
+    assertThrows("Expect commit job error.", IOException.class, 
suite::commitJob);
+    CommitUtils.removeError("marker");
+
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarkerNotExist();
+    assertEquals(0, suite.fs().listStatus(suite.outputPath).length);
+  }
+
+  @Test
+  public void testTaskCommitAfterJobCommit() throws Exception {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    // Commit the job
+    suite.assertNoPartFiles();
+    suite.commitJob();
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarker();
+    suite.verifyPartContent();
+
+    // Commit the task again.
+    assertThrows(FileNotFoundException.class, suite::commitTask);
+  }
+
+  @Test
+  public void testTaskCommitWithConsistentJobId() throws Exception {
+    Configuration conf = newConf();
+    String consistentJobId = randomFormedJobId();
+    conf.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId);
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    // By now, we have two "jobId"s, one is spark uuid, and the other is the 
jobId in taskAttempt.
+    // The job committer will adopt the former.
+    suite.setupJob();
+
+    // Next, we clear spark uuid, and set the jobId of taskAttempt to another 
value. In this case,
+    // the committer will take the jobId of taskAttempt as the final jobId, 
which is not consistent
+    // with the one that committer holds.
+    conf.unset(CommitUtils.SPARK_WRITE_UUID);
+    String anotherJobId = randomTrimmedJobId();
+    TaskAttemptID taskAttemptId1 = JobSuite.createTaskAttemptId(anotherJobId, 
JobSuite.DEFAULT_APP_ATTEMPT_ID);
+    final TaskAttemptContext attemptContext1 =
+        JobSuite.createTaskAttemptContext(conf, taskAttemptId1, 
JobSuite.DEFAULT_APP_ATTEMPT_ID);
+
+    assertThrows("JobId set in the context", IllegalArgumentException.class,
+        () -> suite.setupTask(attemptContext1));
+
+    // Even though we use another taskAttempt, as long as we ensure the spark 
uuid is consistent,
+    // the jobId in committer is consistent.
+    conf.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId);
+    conf.set(FileOutputFormat.OUTDIR, outputPath.toString());
+    anotherJobId = randomTrimmedJobId();
+    TaskAttemptID taskAttemptId2 = JobSuite.createTaskAttemptId(anotherJobId, 
JobSuite.DEFAULT_APP_ATTEMPT_ID);
+    TaskAttemptContext attemptContext2 =
+        JobSuite.createTaskAttemptContext(conf, taskAttemptId2, 
JobSuite.DEFAULT_APP_ATTEMPT_ID);
+
+    suite.setupTask(attemptContext2);
+    // Write output must use the same task context with setup task.
+    suite.writeOutput(attemptContext2);
+    // Commit task must use the same task context with setup task.
+    suite.commitTask(attemptContext2);
+    suite.assertPendingSetAtRightLocation();
+
+    // Commit the job
+    suite.assertNoPartFiles();
+    suite.commitJob();
+
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarker();
+    suite.verifyPartContent();
+  }
+
+  @Test
+  public void testConcurrentJobs() throws Exception {
+    JobSuite suite1 = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    JobSuite suite2 = JobSuite.create(conf, job2Task1Attempt0, outputPath);
+    Assume.assumeFalse(suite1.skipTests());
+    Assume.assumeFalse(suite2.skipTests());
+    suite1.setupJob();
+    suite2.setupJob();
+    suite1.setupTask();
+    suite2.setupTask();
+    suite1.writeOutput();
+    suite2.writeOutput();
+    suite1.commitTask();
+    suite2.commitTask();
+
+    // Job2 commit the job.
+    suite2.assertNoPartFiles();
+    suite2.commitJob();
+    suite2.assertPartFiles(1);
+
+    suite2.assertNoMagicMultipartUpload();
+    suite2.assertNoMagicObjectKeys();
+    suite2.assertSuccessMarker();
+    suite2.assertSummaryReport(reportDir);
+    suite2.verifyPartContent();
+    suite2.assertMagicPathExist(outputPath);
+
+    // Job1 commit the job.
+    suite1.commitJob();
+    suite2.assertPartFiles(2);
+
+    // Verify the output.
+    suite1.assertNoMagicMultipartUpload();
+    suite1.assertNoMagicObjectKeys();
+    suite1.assertSuccessMarker();
+    suite1.assertSummaryReport(reportDir);
+    suite1.verifyPartContent();
+    suite1.assertMagicPathNotExist(outputPath);
+  }
+}
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/JobSuite.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/JobSuite.java
new file mode 100644
index 00000000000..cba71c74802
--- /dev/null
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/JobSuite.java
@@ -0,0 +1,219 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.net.NetUtils;
+import org.junit.Assert;
+
+import java.io.IOException;
+
+public class JobSuite extends BaseJobSuite {
+  private static final CommitterFactory FACTORY = new CommitterFactory();
+  private final JobContext jobContext;
+  private final TaskAttemptContext taskAttemptContext;
+  private final Committer committer;
+
+  private JobSuite(FileSystem fs, Configuration conf, TaskAttemptID 
taskAttemptId, int appAttemptId, Path outputPath)
+      throws IOException {
+    this.fs = fs;
+    // Initialize the job instance.
+    this.job = Job.getInstance(conf);
+    job.setJobID(JobID.forName(CommitUtils.buildJobId(conf, 
taskAttemptId.getJobID())));
+    this.jobContext = createJobContext(job.getConfiguration(), taskAttemptId);
+    this.jobId = CommitUtils.buildJobId(jobContext);
+    this.taskAttemptContext = createTaskAttemptContext(job.getConfiguration(), 
taskAttemptId, appAttemptId);
+
+    // Set job output directory.
+    FileOutputFormat.setOutputPath(job, outputPath);
+    this.outputPath = outputPath;
+    this.storage = ObjectStorageFactory.create(outputPath.toUri().getScheme(), 
outputPath.toUri().getAuthority(), conf);
+
+    // Initialize committer.
+    this.committer = (Committer) FACTORY.createOutputCommitter(outputPath, 
taskAttemptContext);
+  }
+
+  public static JobSuite create(Configuration conf, TaskAttemptID 
taskAttemptId, Path outDir) throws IOException {
+    FileSystem fs = outDir.getFileSystem(conf);
+    return new JobSuite(fs, conf, taskAttemptId, DEFAULT_APP_ATTEMPT_ID, 
outDir);
+  }
+
+  public static TaskAttemptID createTaskAttemptId(String trimmedJobId, int 
attemptId) {
+    String attempt = String.format("attempt_%s_m_000000_%d", trimmedJobId, 
attemptId);
+    return TaskAttemptID.forName(attempt);
+  }
+
+  public static TaskAttemptID createTaskAttemptId(String trimmedJobId, int 
taskId, int attemptId) {
+    String[] parts = trimmedJobId.split("_");
+    return new TaskAttemptID(parts[0], Integer.parseInt(parts[1]), 
TaskType.MAP, taskId, attemptId);
+  }
+
+  public static JobContext createJobContext(Configuration jobConf, 
TaskAttemptID taskAttemptId) {
+    return new JobContextImpl(jobConf, taskAttemptId.getJobID());
+  }
+
+  public static TaskAttemptContext createTaskAttemptContext(
+      Configuration jobConf, TaskAttemptID taskAttemptId, int appAttemptId) 
throws IOException {
+    // Set the key values for job configuration.
+    jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttemptId.toString());
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
+    jobConf.set(PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS, 
CommitterFactory.class.getName());
+    return new TaskAttemptContextImpl(jobConf, taskAttemptId);
+  }
+
+  public void setupJob() throws IOException {
+    committer.setupJob(jobContext);
+  }
+
+  public void setupTask() throws IOException {
+    committer.setupTask(taskAttemptContext);
+  }
+
+  // This method simulates the scenario that the job may set up task with a 
different
+  // taskAttemptContext, e.g., for a spark job.
+  public void setupTask(TaskAttemptContext taskAttemptContext) throws 
IOException {
+    committer.setupTask(taskAttemptContext);
+  }
+
+  public void writeOutput() throws Exception {
+    writeOutput(taskAttemptContext);
+  }
+
+  // This method simulates the scenario that the job may set up task with a 
different
+  // taskAttemptContext, e.g., for a spark job.
+  public void writeOutput(TaskAttemptContext taskAttemptContext) throws 
Exception {
+    RecordWriter<Object, Object> writer = new 
TextOutputFormat<>().getRecordWriter(taskAttemptContext);
+    NullWritable nullKey = NullWritable.get();
+    NullWritable nullVal = NullWritable.get();
+    Object[] keys = new Object[]{KEY_1, nullKey, null, nullKey, null, KEY_2};
+    Object[] vals = new Object[]{VAL_1, nullVal, null, null, nullVal, VAL_2};
+    try {
+      Assert.assertEquals(keys.length, vals.length);
+      for (int i = 0; i < keys.length; i++) {
+        writer.write(keys[i], vals[i]);
+      }
+    } finally {
+      writer.close(taskAttemptContext);
+    }
+  }
+
+  public boolean needsTaskCommit() {
+    return committer.needsTaskCommit(taskAttemptContext);
+  }
+
+  public void commitTask() throws IOException {
+    committer.commitTask(taskAttemptContext);
+  }
+
+  // This method simulates the scenario that the job may set up task with a 
different
+  // taskAttemptContext, e.g., for a spark job.
+  public void commitTask(TaskAttemptContext taskAttemptContext) throws 
IOException {
+    committer.commitTask(taskAttemptContext);
+  }
+
+  public void abortTask() throws IOException {
+    committer.abortTask(taskAttemptContext);
+  }
+
+  public void commitJob() throws IOException {
+    committer.commitJob(jobContext);
+  }
+
+  @Override
+  public Path magicPartPath() {
+    return new Path(committer.getWorkPath(), 
FileOutputFormat.getUniqueFile(taskAttemptContext, "part", ""));
+  }
+
+  @Override
+  public Path magicPendingSetPath() {
+    return CommitUtils.magicTaskPendingSetPath(taskAttemptContext, outputPath);
+  }
+
+  public TaskAttemptContext taskAttemptContext() {
+    return taskAttemptContext;
+  }
+
+  public Committer committer() {
+    return committer;
+  }
+
+  @Override
+  public void assertNoTaskAttemptPath() throws IOException {
+    Path path = CommitUtils.magicTaskAttemptBasePath(taskAttemptContext, 
outputPath);
+    Assert.assertFalse("Task attempt path should be not existing", 
fs.exists(path));
+    String pathToKey = ObjectUtils.pathToKey(path);
+    Assert.assertNull("Should have no task attempt path key", 
storage.head(pathToKey));
+  }
+
+  @Override
+  protected boolean skipTests() {
+    return storage.bucket().isDirectory();
+  }
+
+  @Override
+  public void assertSuccessMarker() throws IOException {
+    Path succPath = CommitUtils.successMarker(outputPath);
+    Assert.assertTrue(String.format("%s should be exists", succPath), 
fs.exists(succPath));
+    SuccessData successData = SuccessData.deserialize(CommitUtils.load(fs, 
succPath));
+    Assert.assertEquals(SuccessData.class.getName(), successData.name());
+    Assert.assertTrue(successData.success());
+    Assert.assertEquals(NetUtils.getHostname(), successData.hostname());
+    Assert.assertEquals(CommitUtils.COMMITTER_NAME, successData.committer());
+    Assert.assertEquals(
+        String.format("Task committer %s", 
taskAttemptContext.getTaskAttemptID()),
+        successData.description());
+    Assert.assertEquals(job.getJobID().toString(), successData.jobId());
+    Assert.assertEquals(1, successData.filenames().size());
+    Assert.assertEquals(destPartKey(), successData.filenames().get(0));
+  }
+
+  @Override
+  public void assertSummaryReport(Path reportDir) throws IOException {
+    Path reportPath = CommitUtils.summaryReport(reportDir, 
job().getJobID().toString());
+    Assert.assertTrue(String.format("%s should be exists", reportPath), 
fs.exists(reportPath));
+    SuccessData reportData = SuccessData.deserialize(CommitUtils.load(fs, 
reportPath));
+    Assert.assertEquals(SuccessData.class.getName(), reportData.name());
+    Assert.assertTrue(reportData.success());
+    Assert.assertEquals(NetUtils.getHostname(), reportData.hostname());
+    Assert.assertEquals(CommitUtils.COMMITTER_NAME, reportData.committer());
+    Assert.assertEquals(
+        String.format("Task committer %s", 
taskAttemptContext.getTaskAttemptID()),
+        reportData.description());
+    Assert.assertEquals(job.getJobID().toString(), reportData.jobId());
+    Assert.assertEquals(1, reportData.filenames().size());
+    Assert.assertEquals(destPartKey(), reportData.filenames().get(0));
+    Assert.assertEquals("clean", reportData.diagnostics().get("stage"));
+  }
+}
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/MRJobTestBase.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/MRJobTestBase.java
new file mode 100644
index 00000000000..5a5d30f63de
--- /dev/null
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/MRJobTestBase.java
@@ -0,0 +1,232 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.terasort.TeraGen;
+import org.apache.hadoop.examples.terasort.TeraSort;
+import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.WordCount;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class MRJobTestBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MRJobTestBase.class);
+
+  private static Configuration conf = new Configuration();
+  private static MiniMRYarnCluster yarnCluster;
+
+  private static FileSystem fs;
+
+  private static Path testDataPath;
+
+  public static void setConf(Configuration newConf) {
+    conf = newConf;
+  }
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
+    conf.setBoolean(YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, false);
+    conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100);
+
+    conf.set("mapreduce.outputcommitter.factory.scheme.tos",
+        CommitterFactory.class.getName()); // 3x newApiCommitter=true.
+    conf.set("mapred.output.committer.class",
+        Committer.class.getName()); // 2x and 3x newApiCommitter=false.
+    conf.set("mapreduce.outputcommitter.class",
+        org.apache.hadoop.fs.tosfs.commit.Committer.class.getName()); // 2x 
newApiCommitter=true.
+
+    // Start the yarn cluster.
+    yarnCluster = new MiniMRYarnCluster("yarn-" + System.currentTimeMillis(), 
2);
+    LOG.info("Default filesystem: {}", conf.get("fs.defaultFS"));
+    LOG.info("Default filesystem implementation: {}", 
conf.get("fs.AbstractFileSystem.tos.impl"));
+
+    yarnCluster.init(conf);
+    yarnCluster.start();
+
+    fs = FileSystem.get(conf);
+    testDataPath = new Path("/mr-test-" + UUIDUtils.random())
+        .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+  }
+
+  @AfterClass
+  public static void afterClass() throws IOException {
+    fs.delete(testDataPath, true);
+    if (yarnCluster != null) {
+      yarnCluster.stop();
+    }
+  }
+
+  @Before
+  public void before() throws IOException {
+  }
+
+  @After
+  public void after() throws IOException {
+  }
+
+  @Test
+  public void testTeraGen() throws Exception {
+    Path teraGenPath = new Path(testDataPath, 
"teraGen").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path output = new Path(teraGenPath, "output");
+    JobConf jobConf = new JobConf(yarnCluster.getConfig());
+    jobConf.addResource(conf);
+    jobConf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
+    jobConf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), 10);
+    jobConf.setBoolean(TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false);
+
+    String[] args = new String[]{Integer.toString(1000), output.toString()};
+    int result = ToolRunner.run(jobConf, new TeraGen(), args);
+    Assert.assertEquals(String.format("teragen %s", StringUtils.join(" ", 
args)), 0, result);
+
+    // Verify the success data.
+    ObjectStorage storage = ObjectStorageFactory.create(
+        output.toUri().getScheme(), output.toUri().getAuthority(), conf);
+    int byteSizes = 0;
+
+    Path success = new Path(output, CommitUtils._SUCCESS);
+    byte[] serializedData = CommitUtils.load(fs, success);
+    SuccessData successData = SuccessData.deserialize(serializedData);
+    Assert.assertTrue("Should execute successfully", successData.success());
+    // Assert the destination paths.
+    Assert.assertEquals(2, successData.filenames().size());
+    successData.filenames().sort(String::compareTo);
+    Assert.assertEquals(ObjectUtils.pathToKey(new Path(output, 
"part-m-00000")),
+        successData.filenames().get(0));
+    Assert.assertEquals(ObjectUtils.pathToKey(new Path(output, 
"part-m-00001")),
+        successData.filenames().get(1));
+
+    for (String partFileKey : successData.filenames()) {
+      ObjectInfo objectInfo = storage.head(partFileKey);
+      Assert.assertNotNull("Output file should be existing", objectInfo);
+      byteSizes += objectInfo.size();
+    }
+
+    Assert.assertEquals(byteSizes, 100 /* Each row 100 bytes */ * 1000 /* 
total 1000 rows */);
+  }
+
+  @Test
+  public void testTeraSort() throws Exception {
+    Path teraGenPath = new Path(testDataPath, 
"teraGen").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path inputPath = new Path(teraGenPath, "output");
+    Path outputPath = new Path(teraGenPath, "sortOutput");
+    JobConf jobConf = new JobConf(yarnCluster.getConfig());
+    jobConf.addResource(conf);
+    jobConf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
+    jobConf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), 10);
+    jobConf.setBoolean(TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false);
+    String[] args = new String[]{inputPath.toString(), outputPath.toString()};
+    int result = ToolRunner.run(jobConf, new TeraSort(), args);
+    Assert.assertEquals(String.format("terasort %s", StringUtils.join(" ", 
args)), 0, result);
+
+    // Verify the success data.
+    ObjectStorage storage = ObjectStorageFactory
+        .create(outputPath.toUri().getScheme(), 
outputPath.toUri().getAuthority(), conf);
+    int byteSizes = 0;
+
+    Path success = new Path(outputPath, CommitUtils._SUCCESS);
+    byte[] serializedData = CommitUtils.load(fs, success);
+    SuccessData successData = SuccessData.deserialize(serializedData);
+    Assert.assertTrue("Should execute successfully", successData.success());
+    // Assert the destination paths.
+    Assert.assertEquals(1, successData.filenames().size());
+    successData.filenames().sort(String::compareTo);
+    Assert.assertEquals(ObjectUtils.pathToKey(new Path(outputPath, 
"part-r-00000")), successData.filenames().get(0));
+
+    for (String partFileKey : successData.filenames()) {
+      ObjectInfo objectInfo = storage.head(partFileKey);
+      Assert.assertNotNull("Output file should be existing", objectInfo);
+      byteSizes += objectInfo.size();
+    }
+
+    Assert.assertEquals(byteSizes, 100 /* Each row 100 bytes */ * 1000 /* 
total 1000 rows */);
+  }
+
+  @Test
+  public void testWordCount() throws Exception {
+    Path wordCountPath = new Path(testDataPath, 
"wc").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path output = new Path(wordCountPath, "output");
+    Path input = new Path(wordCountPath, "input");
+    JobConf jobConf = new JobConf(yarnCluster.getConfig());
+    jobConf.addResource(conf);
+    jobConf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
+    jobConf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), 10);
+    jobConf.setBoolean(TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false);
+
+    if (!fs.mkdirs(input)) {
+      throw new IOException("Mkdirs failed to create " + input.toString());
+    }
+
+    DataOutputStream file = fs.create(new Path(input, "part-0"));
+    file.writeBytes("a a b c");
+    file.close();
+
+    String[] args = new String[]{input.toString(), output.toString()};
+    int result = ToolRunner.run(jobConf, new WordCount(), args);
+    Assert.assertEquals(String.format("WordCount %s", StringUtils.join(" ", 
args)), 0, result);
+
+    // Verify the success path.
+    Assert.assertTrue(fs.exists(new Path(output, CommitUtils._SUCCESS)));
+    Assert.assertTrue(fs.exists(new Path(output, "part-00000")));
+
+    Path success = new Path(output, CommitUtils._SUCCESS);
+    Assert.assertTrue("Success file must be not empty", CommitUtils.load(fs, 
success).length != 0);
+
+    byte[] serializedData = CommitUtils.load(fs, new Path(output, 
"part-00000"));
+    String outputAsStr = new String(serializedData);
+    Map<String, Integer> resAsMap = getResultAsMap(outputAsStr);
+    Assert.assertEquals(2, (int) resAsMap.get("a"));
+    Assert.assertEquals(1, (int) resAsMap.get("b"));
+    Assert.assertEquals(1, (int) resAsMap.get("c"));
+  }
+
+  private Map<String, Integer> getResultAsMap(String outputAsStr) {
+    Map<String, Integer> result = new HashMap<>();
+    for (String line : outputAsStr.split("\n")) {
+      String[] tokens = line.split("\t");
+      Assert.assertTrue(String.format("Not enough tokens in in string %s from 
output %s", line, outputAsStr),
+          tokens.length > 1);
+      result.put(tokens[0], Integer.parseInt(tokens[1]));
+    }
+    return result;
+  }
+}
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestCommitter.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestCommitter.java
new file mode 100644
index 00000000000..e7176047f32
--- /dev/null
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestCommitter.java
@@ -0,0 +1,29 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.util.ParseUtils;
+
+public class TestCommitter extends CommitterTestBase {
+  @Override
+  protected Configuration newConf() {
+    Configuration conf = new Configuration();
+    conf.set("fs.defaultFS", String.format("tos://%s", 
ParseUtils.envAsString("TOS_BUCKET", false)));
+    return conf;
+  }
+}
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMRJob.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMRJob.java
new file mode 100644
index 00000000000..d868380802b
--- /dev/null
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMRJob.java
@@ -0,0 +1,49 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.conf.TosKeys;
+import org.apache.hadoop.fs.tosfs.object.tos.TOS;
+import org.apache.hadoop.fs.tosfs.util.ParseUtils;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+public class TestMRJob extends MRJobTestBase {
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    // Create the new configuration and set it to the IT Case.
+    Configuration newConf = new Configuration();
+    newConf.set("fs.defaultFS", String.format("tos://%s", 
TestUtility.bucket()));
+    // Application in yarn cluster cannot read the environment variables from 
user bash, so here we
+    // set it into the config manually.
+    newConf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key("tos"),
+        ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT, false));
+    newConf.set(TosKeys.FS_TOS_ACCESS_KEY_ID,
+        ParseUtils.envAsString(TOS.ENV_TOS_ACCESS_KEY_ID, false));
+    newConf.set(TosKeys.FS_TOS_SECRET_ACCESS_KEY,
+        ParseUtils.envAsString(TOS.ENV_TOS_SECRET_ACCESS_KEY, false));
+
+    MRJobTestBase.setConf(newConf);
+    // Continue to prepare the IT Case environments.
+    MRJobTestBase.beforeClass();
+  }
+}
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java
index 0adba267667..b51505b47f8 100644
--- 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java
@@ -186,7 +186,7 @@ public class TestMagicOutputStream extends 
ObjectStorageTestBase {
   private class TestingMagicOutputStream extends MagicOutputStream {
 
     TestingMagicOutputStream(Path magic) {
-      super(fs, storage, threadPool, protonConf, magic);
+      super(fs, storage, threadPool, tosConf, magic);
     }
 
     protected void persist(Path p, byte[] data) {
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/CommitterTestBase.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/CommitterTestBase.java
new file mode 100644
index 00000000000..3a94e3b0181
--- /dev/null
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/CommitterTestBase.java
@@ -0,0 +1,364 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.commit.CommitUtils;
+import org.apache.hadoop.fs.tosfs.commit.Pending;
+import org.apache.hadoop.fs.tosfs.commit.PendingSet;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class CommitterTestBase {
+  private Configuration conf;
+  private FileSystem fs;
+  private Path outputPath;
+  private TaskAttemptID taskAttempt0;
+  private Path reportDir;
+
+  @Before
+  public void setup() throws IOException {
+    conf = newConf();
+    fs = FileSystem.get(conf);
+    String uuid = UUIDUtils.random();
+    outputPath = fs.makeQualified(new Path("/test/" + uuid));
+    taskAttempt0 = JobSuite.createTaskAttemptId(randomTrimmedJobId(), 0);
+
+    reportDir = fs.makeQualified(new Path("/report/" + uuid));
+    fs.mkdirs(reportDir);
+    
conf.set(org.apache.hadoop.fs.tosfs.commit.Committer.COMMITTER_SUMMARY_REPORT_DIR,
+        reportDir.toUri().toString());
+  }
+
+  protected abstract Configuration newConf();
+
+  @After
+  public void teardown() {
+    CommonUtils.runQuietly(() -> fs.delete(outputPath, true));
+    IOUtils.closeStream(fs);
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    List<String> committerThreads = Thread.getAllStackTraces().keySet()
+        .stream()
+        .map(Thread::getName)
+        .filter(n -> 
n.startsWith(org.apache.hadoop.fs.tosfs.commit.Committer.THREADS_PREFIX))
+        .collect(Collectors.toList());
+    Assert.assertTrue("Outstanding committer threads", 
committerThreads.isEmpty());
+  }
+
+  private static String randomTrimmedJobId() {
+    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
+    return String.format("%s%04d_%04d", formatter.format(new Date()),
+        (long) (Math.random() * 1000),
+        (long) (Math.random() * 1000));
+  }
+
+  private static String randomFormedJobId() {
+    return String.format("job_%s", randomTrimmedJobId());
+  }
+
+  @Test
+  public void testSetupJob() throws IOException {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    // Setup job.
+    suite.setupJob();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicKeys();
+  }
+
+  @Test
+  public void testSetupJobWithOrphanPaths() throws IOException, 
InterruptedException {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    // Orphan success marker.
+    Path successPath = CommitUtils.successMarker(outputPath);
+    CommitUtils.save(fs, successPath, new byte[]{});
+    Assert.assertTrue(fs.exists(successPath));
+
+    // Orphan job path.
+    Path jobPath = CommitUtils.magicJobPath(suite.committer().jobId(), 
outputPath);
+    fs.mkdirs(jobPath);
+    Assert.assertTrue("The job path should be existing", fs.exists(jobPath));
+    Path subPath = new Path(jobPath, "tmp.pending");
+    CommitUtils.save(fs, subPath, new byte[]{});
+    Assert.assertTrue("The sub path under job path should be existing.", 
fs.exists(subPath));
+    FileStatus jobPathStatus = fs.getFileStatus(jobPath);
+
+    Thread.sleep(1000L);
+    suite.setupJob();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicKeys();
+
+    Assert.assertFalse("Should have deleted the success path", 
fs.exists(successPath));
+    Assert.assertTrue("Should have re-created the job path", 
fs.exists(jobPath));
+    Assert.assertFalse("Should have deleted the sub path under the job path", 
fs.exists(subPath));
+  }
+
+  @Test
+  public void testSetupTask() throws IOException {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    // Remaining attempt task path.
+    Path taskAttemptBasePath = 
CommitUtils.magicTaskAttemptBasePath(suite.taskAttemptContext(), outputPath);
+    Path subTaskAttemptPath = new Path(taskAttemptBasePath, "tmp.pending");
+    CommitUtils.save(fs, subTaskAttemptPath, new byte[]{});
+    Assert.assertTrue(fs.exists(taskAttemptBasePath));
+    Assert.assertTrue(fs.exists(subTaskAttemptPath));
+
+    // Setup job.
+    suite.setupJob();
+    suite.assertHasMagicKeys();
+    // It will clear all the job path once we've set up the job.
+    Assert.assertFalse(fs.exists(taskAttemptBasePath));
+    Assert.assertFalse(fs.exists(subTaskAttemptPath));
+
+    // Left some the task paths.
+    CommitUtils.save(fs, subTaskAttemptPath, new byte[]{});
+    Assert.assertTrue(fs.exists(taskAttemptBasePath));
+    Assert.assertTrue(fs.exists(subTaskAttemptPath));
+
+    // Setup task.
+    suite.setupTask();
+    Assert.assertFalse(fs.exists(subTaskAttemptPath));
+  }
+
+  @Test
+  public void testCommitTask() throws Exception {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    // Setup job
+    suite.setupJob();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicKeys();
+
+    // Setup task
+    suite.setupTask();
+
+    // Write records.
+    suite.assertNoMagicPendingFile();
+    suite.assertMultipartUpload(0);
+    suite.writeOutput();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicPendingFile();
+    suite.assertNoMagicMultipartUpload();
+    suite.assertMultipartUpload(1);
+    // Assert the pending file content.
+    Path pendingPath = suite.magicPendingPath();
+    byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath);
+    Pending pending = Pending.deserialize(pendingData);
+    Assert.assertEquals(suite.destPartKey(), pending.destKey());
+    Assert.assertEquals(20, pending.length());
+    Assert.assertEquals(1, pending.parts().size());
+
+    // Commit the task.
+    suite.commitTask();
+
+    // Verify the pending set file.
+    suite.assertHasPendingSet();
+    // Assert the pending set file content.
+    Path pendingSetPath = suite.magicPendingSetPath();
+    byte[] pendingSetData = CommitUtils.load(suite.fs(), pendingSetPath);
+    PendingSet pendingSet = PendingSet.deserialize(pendingSetData);
+    Assert.assertEquals(suite.job().getJobID().toString(), pendingSet.jobId());
+    Assert.assertEquals(1, pendingSet.commits().size());
+    Assert.assertEquals(pending, pendingSet.commits().get(0));
+    Assert.assertEquals(pendingSet.extraData(),
+        ImmutableMap.of(CommitUtils.TASK_ATTEMPT_ID, 
suite.taskAttemptContext().getTaskAttemptID().toString()));
+
+    // Complete the multipart upload and verify the results.
+    ObjectStorage storage = suite.storage();
+    storage.completeUpload(pending.destKey(), pending.uploadId(), 
pending.parts());
+    suite.verifyPartContent();
+  }
+
+  @Test
+  public void testAbortTask() throws Exception {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+
+    // Pre-check before the output write.
+    suite.assertNoMagicPendingFile();
+    suite.assertMultipartUpload(0);
+
+    // Execute the output write.
+    suite.writeOutput();
+
+    // Post-check after the output write.
+    suite.assertHasMagicPendingFile();
+    suite.assertNoMagicMultipartUpload();
+    suite.assertMultipartUpload(1);
+    // Assert the pending file content.
+    Path pendingPath = suite.magicPendingPath();
+    byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath);
+    Pending pending = Pending.deserialize(pendingData);
+    Assert.assertEquals(suite.destPartKey(), pending.destKey());
+    Assert.assertEquals(20, pending.length());
+    Assert.assertEquals(1, pending.parts().size());
+
+    // Abort the task.
+    suite.abortTask();
+
+    // Verify the state after aborting task.
+    suite.assertNoMagicPendingFile();
+    suite.assertNoMagicMultipartUpload();
+    suite.assertMultipartUpload(0);
+    suite.assertNoTaskAttemptPath();
+  }
+
+  @Test
+  public void testCommitJob() throws Exception {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    // Commit the job.
+    suite.assertNoPartFiles();
+    suite.commitJob();
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarker();
+    suite.assertSummaryReport(reportDir);
+    suite.verifyPartContent();
+  }
+
+
+  @Test
+  public void testCommitJobFailed() throws Exception {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    // Commit the job.
+    suite.assertNoPartFiles();
+    suite.commitJob();
+  }
+
+  @Test
+  public void testTaskCommitAfterJobCommit() throws Exception {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    // Commit the job
+    suite.assertNoPartFiles();
+    suite.commitJob();
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarker();
+    suite.verifyPartContent();
+
+    // Commit the task again.
+    Assert.assertThrows(FileNotFoundException.class, suite::commitTask);
+  }
+
+  @Test
+  public void testTaskCommitWithConsistentJobId() throws Exception {
+    Configuration conf = newConf();
+    String consistentJobId = randomFormedJobId();
+    conf.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId);
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    // By now, we have two "jobId"s, one is spark uuid, and the other is the 
jobId in taskAttempt.
+    // The job committer will adopt the former.
+    suite.setupJob();
+
+    // Next, we clear spark uuid, and set the jobId of taskAttempt to another 
value. In this case,
+    // the committer will take the jobId of taskAttempt as the final jobId, 
which is not consistent
+    // with the one that committer holds.
+    conf.unset(CommitUtils.SPARK_WRITE_UUID);
+    JobConf jobConf = new JobConf(conf);
+    String anotherJobId = randomTrimmedJobId();
+    TaskAttemptID taskAttemptId1 =
+        JobSuite.createTaskAttemptId(anotherJobId, 
JobSuite.DEFAULT_APP_ATTEMPT_ID);
+    final TaskAttemptContext attemptContext1 =
+        JobSuite.createTaskAttemptContext(jobConf, taskAttemptId1, 
JobSuite.DEFAULT_APP_ATTEMPT_ID);
+
+    Assert.assertThrows("JobId set in the context", 
IllegalArgumentException.class,
+        () -> suite.setupTask(attemptContext1));
+
+    // Even though we use another taskAttempt, as long as we ensure the spark 
uuid is consistent,
+    // the jobId in committer is consistent.
+    conf.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId);
+    conf.set(FileOutputFormat.OUTDIR, outputPath.toString());
+    jobConf = new JobConf(conf);
+    anotherJobId = randomTrimmedJobId();
+    TaskAttemptID taskAttemptId2 =
+        JobSuite.createTaskAttemptId(anotherJobId, 
JobSuite.DEFAULT_APP_ATTEMPT_ID);
+    TaskAttemptContext attemptContext2 =
+        JobSuite.createTaskAttemptContext(jobConf, taskAttemptId2, 
JobSuite.DEFAULT_APP_ATTEMPT_ID);
+
+    suite.setupTask(attemptContext2);
+    // Write output must use the same task context with setup task.
+    suite.writeOutput(attemptContext2);
+    // Commit task must use the same task context with setup task.
+    suite.commitTask(attemptContext2);
+    suite.assertPendingSetAtRightLocation();
+
+    // Commit the job
+    suite.assertNoPartFiles();
+    suite.commitJob();
+
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarker();
+    suite.verifyPartContent();
+  }
+}
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/JobSuite.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/JobSuite.java
new file mode 100644
index 00000000000..034bc3460a1
--- /dev/null
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/JobSuite.java
@@ -0,0 +1,228 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit.mapred;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.commit.BaseJobSuite;
+import org.apache.hadoop.fs.tosfs.commit.CommitUtils;
+import org.apache.hadoop.fs.tosfs.commit.SuccessData;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class JobSuite extends BaseJobSuite {
+  private static final Logger LOG = LoggerFactory.getLogger(JobSuite.class);
+  private final JobContext jobContext;
+  private final TaskAttemptContext taskAttemptContext;
+  private final Committer committer;
+
+  private JobSuite(FileSystem fs, JobConf conf,
+                   TaskAttemptID taskAttemptId, int appAttemptId, Path 
outputPath)
+      throws IOException {
+    this.fs = fs;
+    // Initialize the job instance.
+    this.job = Job.getInstance(conf);
+    job.setJobID(JobID.forName(CommitUtils.buildJobId(conf, 
taskAttemptId.getJobID())));
+    this.jobContext = createJobContext(conf, taskAttemptId);
+    this.taskAttemptContext = createTaskAttemptContext(conf, taskAttemptId, 
appAttemptId);
+    this.jobId = CommitUtils.buildJobId(jobContext);
+
+    // Set job output directory.
+    FileOutputFormat.setOutputPath(conf, outputPath);
+    this.outputPath = outputPath;
+    this.storage = ObjectStorageFactory.create(outputPath.toUri().getScheme(),
+        outputPath.toUri().getAuthority(), conf);
+
+    // Initialize committer.
+    this.committer = new Committer();
+    this.committer.setupTask(taskAttemptContext);
+  }
+
+  public static JobSuite create(Configuration conf, TaskAttemptID 
taskAttemptId, Path outDir)
+      throws IOException {
+    FileSystem fs = outDir.getFileSystem(conf);
+    return new JobSuite(fs, new JobConf(conf), taskAttemptId, 
DEFAULT_APP_ATTEMPT_ID, outDir);
+  }
+
+  public static TaskAttemptID createTaskAttemptId(String trimmedJobId, int 
attemptId) {
+    String attempt = String.format("attempt_%s_m_000000_%d", trimmedJobId, 
attemptId);
+    return TaskAttemptID.forName(attempt);
+  }
+
+  public static JobContext createJobContext(JobConf jobConf, TaskAttemptID 
taskAttemptId) {
+    return new JobContextImpl(jobConf, taskAttemptId.getJobID());
+  }
+
+  public static TaskAttemptContext createTaskAttemptContext(
+      JobConf jobConf, TaskAttemptID taskAttemptId, int appAttemptId) throws 
IOException {
+    // Set the key values for job configuration.
+    jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttemptId.toString());
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
+    jobConf.set("mapred.output.committer.class",
+        Committer.class.getName()); // 2x and 3x newApiCommitter=false.
+    return new TaskAttemptContextImpl(jobConf, taskAttemptId);
+  }
+
+  public void setupJob() throws IOException {
+    committer.setupJob(jobContext);
+  }
+
+  public void setupTask() throws IOException {
+    committer.setupTask(taskAttemptContext);
+  }
+
+  // This method simulates the scenario that the job may set up task with a 
different
+  // taskAttemptContext, e.g., for a spark job.
+  public void setupTask(TaskAttemptContext taskAttemptContext) throws 
IOException {
+    committer.setupTask(taskAttemptContext);
+  }
+
+  public void writeOutput() throws Exception {
+    writeOutput(taskAttemptContext);
+  }
+
+  // This method simulates the scenario that the job may set up task with a 
different
+  // taskAttemptContext, e.g., for a spark job.
+  public void writeOutput(TaskAttemptContext taskAttemptContext) throws 
Exception {
+    RecordWriter<Object, Object> writer = new 
TextOutputFormat<>().getRecordWriter(fs,
+        taskAttemptContext.getJobConf(),
+        CommitUtils.buildJobId(taskAttemptContext),
+        taskAttemptContext.getProgressible());
+    NullWritable nullKey = NullWritable.get();
+    NullWritable nullVal = NullWritable.get();
+    Object[] keys = new Object[]{KEY_1, nullKey, null, nullKey, null, KEY_2};
+    Object[] vals = new Object[]{VAL_1, nullVal, null, null, nullVal, VAL_2};
+    try {
+      Assert.assertEquals(keys.length, vals.length);
+      for (int i = 0; i < keys.length; i++) {
+        writer.write(keys[i], vals[i]);
+      }
+    } finally {
+      writer.close(Reporter.NULL);
+    }
+  }
+
+  public boolean needsTaskCommit() throws IOException {
+    return committer.needsTaskCommit(taskAttemptContext);
+  }
+
+  public void commitTask() throws IOException {
+    committer.commitTask(taskAttemptContext);
+  }
+
+  // This method simulates the scenario that the job may set up task with a 
different
+  // taskAttemptContext, e.g., for a spark job.
+  public void commitTask(TaskAttemptContext taskAttemptContext) throws 
IOException {
+    committer.commitTask(taskAttemptContext);
+  }
+
+  public void abortTask() throws IOException {
+    committer.abortTask(taskAttemptContext);
+  }
+
+  public void commitJob() throws IOException {
+    committer.commitJob(jobContext);
+  }
+
+  @Override
+  public Path magicPartPath() {
+    return new Path(committer.getWorkPath(), committer.jobId());
+  }
+
+  @Override
+  public Path magicPendingSetPath() {
+    return CommitUtils.magicTaskPendingSetPath(taskAttemptContext, outputPath);
+  }
+
+  public TaskAttemptContext taskAttemptContext() {
+    return taskAttemptContext;
+  }
+
+  public Committer committer() {
+    return committer;
+  }
+
+  @Override
+  public void assertNoTaskAttemptPath() throws IOException {
+    Path path = CommitUtils.magicTaskAttemptBasePath(taskAttemptContext, 
outputPath);
+    Assert.assertFalse("Task attempt path should be not existing", 
fs.exists(path));
+    String pathToKey = ObjectUtils.pathToKey(path);
+    Assert.assertNull("Should have no task attempt path key", 
storage.head(pathToKey));
+  }
+
+  @Override
+  protected boolean skipTests() {
+    return storage.bucket().isDirectory();
+  }
+
+  @Override
+  public void assertSuccessMarker() throws IOException {
+    Path succPath = CommitUtils.successMarker(outputPath);
+    Assert.assertTrue(String.format("%s should be exists", succPath), 
fs.exists(succPath));
+    SuccessData successData = SuccessData.deserialize(CommitUtils.load(fs, 
succPath));
+    Assert.assertEquals(SuccessData.class.getName(), successData.name());
+    Assert.assertTrue(successData.success());
+    Assert.assertEquals(NetUtils.getHostname(), successData.hostname());
+    Assert.assertEquals(CommitUtils.COMMITTER_NAME, successData.committer());
+    Assert.assertEquals(
+        String.format("Task committer %s", 
taskAttemptContext.getTaskAttemptID()),
+        successData.description());
+    Assert.assertEquals(job.getJobID().toString(), successData.jobId());
+    Assert.assertEquals(1, successData.filenames().size());
+    Assert.assertEquals(destPartKey(), successData.filenames().get(0));
+  }
+
+  @Override
+  public void assertSummaryReport(Path reportDir) throws IOException {
+    Path reportPath = CommitUtils.summaryReport(reportDir, 
job().getJobID().toString());
+    Assert.assertTrue(String.format("%s should be exists", reportPath), 
fs.exists(reportPath));
+    SuccessData reportData = SuccessData.deserialize(CommitUtils.load(fs, 
reportPath));
+    Assert.assertEquals(SuccessData.class.getName(), reportData.name());
+    Assert.assertTrue(reportData.success());
+    Assert.assertEquals(NetUtils.getHostname(), reportData.hostname());
+    Assert.assertEquals(CommitUtils.COMMITTER_NAME, reportData.committer());
+    Assert.assertEquals(
+        String.format("Task committer %s", 
taskAttemptContext.getTaskAttemptID()),
+        reportData.description());
+    Assert.assertEquals(job.getJobID().toString(), reportData.jobId());
+    Assert.assertEquals(1, reportData.filenames().size());
+    Assert.assertEquals(destPartKey(), reportData.filenames().get(0));
+    Assert.assertEquals("clean", reportData.diagnostics().get("stage"));
+  }
+}
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/TestCommitter.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/TestCommitter.java
new file mode 100644
index 00000000000..48f90ec3d30
--- /dev/null
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/TestCommitter.java
@@ -0,0 +1,29 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+
+public class TestCommitter extends CommitterTestBase {
+  @Override
+  protected Configuration newConf() {
+    Configuration conf = new Configuration();
+    conf.set("fs.defaultFS", String.format("tos://%s", TestUtility.bucket()));
+    return conf;
+  }
+}
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java
index a3677a54df1..266f704bf6f 100644
--- 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java
@@ -38,7 +38,7 @@ import java.io.IOException;
 public class ObjectStorageTestBase {
   private static final Logger LOG = 
LoggerFactory.getLogger(ObjectStorageTestBase.class);
   protected Configuration conf;
-  protected Configuration protonConf;
+  protected Configuration tosConf;
   protected Path testDir;
   protected FileSystem fs;
   protected String scheme;
@@ -55,14 +55,14 @@ public class ObjectStorageTestBase {
     conf = new Configuration();
     conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key("filestore"), 
tempDirPath);
     conf.set("fs.filestore.impl", LocalFileSystem.class.getName());
-    protonConf = new Configuration(conf);
+    tosConf = new Configuration(conf);
     // Set the environment variable for ObjectTestUtils#assertObject
     TestUtility.setSystemEnv(FileStore.ENV_FILE_STORAGE_ROOT, tempDirPath);
 
     testDir = new Path("filestore://" + FileStore.DEFAULT_BUCKET + "/", 
UUIDUtils.random());
     fs = testDir.getFileSystem(conf);
     scheme = testDir.toUri().getScheme();
-    storage = ObjectStorageFactory.create(scheme, 
testDir.toUri().getAuthority(), protonConf);
+    storage = ObjectStorageFactory.create(scheme, 
testDir.toUri().getAuthority(), tosConf);
   }
 
   @After
diff --git 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java
 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java
index ac450d6d958..ff1dd89d186 100644
--- 
a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java
+++ 
b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java
@@ -73,7 +73,7 @@ public class TestObjectOutputStream extends 
ObjectStorageTestBase {
       for (int i = 0; i < 3; i++) {
         tmpDirs.add(tmp.newDir());
       }
-      Configuration newConf = new Configuration(protonConf);
+      Configuration newConf = new Configuration(tosConf);
       newConf.set(ConfKeys.FS_MULTIPART_STAGING_DIR.key("filestore"), 
Joiner.on(",").join(tmpDirs));
 
       // Start multiple threads to open streams to create staging dir.
@@ -91,7 +91,7 @@ public class TestObjectOutputStream extends 
ObjectStorageTestBase {
   @Test
   public void testWriteZeroByte() throws IOException {
     Path zeroByteTxt = path("zero-byte.txt");
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
protonConf, zeroByteTxt, true);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
tosConf, zeroByteTxt, true);
     // write zero-byte and close.
     out.write(new byte[0], 0, 0);
     out.close();
@@ -104,7 +104,7 @@ public class TestObjectOutputStream extends 
ObjectStorageTestBase {
   @Test
   public void testWriteZeroByteWithoutAllowPut() throws IOException {
     Path zeroByteTxt = path("zero-byte-without-allow-put.txt");
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
protonConf, zeroByteTxt, false);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
tosConf, zeroByteTxt, false);
     // write zero-byte and close.
     out.close();
     assertStagingPart(0, out.stagingParts());
@@ -116,7 +116,7 @@ public class TestObjectOutputStream extends 
ObjectStorageTestBase {
   @Test
   public void testDeleteStagingFileWhenUploadPartsOK() throws IOException {
     Path path = path("data.txt");
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
protonConf, path, true);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
tosConf, path, true);
     byte[] data = TestUtility.rand((int) (ConfKeys.FS_MULTIPART_SIZE_DEFAULT * 
2));
     out.write(data);
     out.waitForPartsUpload();
@@ -132,7 +132,7 @@ public class TestObjectOutputStream extends 
ObjectStorageTestBase {
   @Test
   public void testDeleteStagingFileWithClose() throws IOException {
     Path path = path("data.txt");
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
protonConf, path, true);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
tosConf, path, true);
     byte[] data = TestUtility.rand((int) (ConfKeys.FS_MULTIPART_SIZE_DEFAULT * 
2));
     out.write(data);
     out.close();
@@ -144,7 +144,7 @@ public class TestObjectOutputStream extends 
ObjectStorageTestBase {
   @Test
   public void testDeleteSimplePutStagingFile() throws IOException {
     Path smallTxt = path("small.txt");
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
protonConf, smallTxt, true);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
tosConf, smallTxt, true);
     byte[] data = TestUtility.rand(4 << 20);
     out.write(data);
     for (StagingPart part : out.stagingParts()) {
@@ -159,7 +159,7 @@ public class TestObjectOutputStream extends 
ObjectStorageTestBase {
   @Test
   public void testSimplePut() throws IOException {
     Path smallTxt = path("small.txt");
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
protonConf, smallTxt, true);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
tosConf, smallTxt, true);
     byte[] data = TestUtility.rand(4 << 20);
     out.write(data);
     out.close();
@@ -171,7 +171,7 @@ public class TestObjectOutputStream extends 
ObjectStorageTestBase {
   }
 
   public void testWrite(int uploadPartSize, int len) throws IOException {
-    Configuration newConf = new Configuration(protonConf);
+    Configuration newConf = new Configuration(tosConf);
     newConf.setLong(ConfKeys.FS_MULTIPART_SIZE.key(FSUtils.scheme(conf, 
testDir.toUri())),
         uploadPartSize);
 
@@ -207,7 +207,7 @@ public class TestObjectOutputStream extends 
ObjectStorageTestBase {
 
   public void testParallelWriteOneOutPutStreamImpl(int partSize, int epochs, 
int batchSize)
       throws IOException, ExecutionException, InterruptedException {
-    Configuration newConf = new Configuration(protonConf);
+    Configuration newConf = new Configuration(tosConf);
     newConf.setLong(ConfKeys.FS_MULTIPART_SIZE.key(FSUtils.scheme(conf, 
testDir.toUri())),
         partSize);
 
@@ -283,7 +283,7 @@ public class TestObjectOutputStream extends 
ObjectStorageTestBase {
   }
 
   private void testMultipartThreshold(int partSize, int multipartThreshold, 
int dataSize) throws IOException {
-    Configuration newConf = new Configuration(protonConf);
+    Configuration newConf = new Configuration(tosConf);
     newConf.setLong(ConfKeys.FS_MULTIPART_SIZE.key(scheme), partSize);
     newConf.setLong(ConfKeys.FS_MULTIPART_THRESHOLD.key(scheme), 
multipartThreshold);
     Path outPath = path(String.format("threshold-%d-%d-%d.txt", partSize, 
multipartThreshold, dataSize));
@@ -370,7 +370,7 @@ public class TestObjectOutputStream extends 
ObjectStorageTestBase {
     int partNum = 1;
 
     byte[] data = TestUtility.rand(len);
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
protonConf, outPath, true);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
tosConf, outPath, true);
     try {
       out.write(data);
       out.close();
@@ -386,7 +386,7 @@ public class TestObjectOutputStream extends 
ObjectStorageTestBase {
   public void testWriteClosedStream() throws IOException {
     byte[] data = TestUtility.rand(10);
     Path outPath = path("testWriteClosedStream.txt");
-    try (ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
protonConf, outPath, true)) {
+    try (ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, 
tosConf, outPath, true)) {
       out.close();
       out.write(data);
     } catch (IllegalStateException e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to