This is an automated email from the ASF dual-hosted git repository. jinglun pushed a commit to branch HADOOP-19236-original in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 3f6662dd064858dac591e35bda83de3dc9a98bab Author: lijinglun <lijing...@bytedance.com> AuthorDate: Thu Oct 17 21:14:56 2024 +0800 Integration of TOS: Add tos magic Committer. --- hadoop-cloud-storage-project/hadoop-tos/pom.xml | 17 + .../hadoop/fs/tosfs/commit/CommitContext.java | 45 ++ .../apache/hadoop/fs/tosfs/commit/CommitUtils.java | 359 +++++++++++++++ .../apache/hadoop/fs/tosfs/commit/Committer.java | 499 +++++++++++++++++++++ .../org/apache/hadoop/fs/tosfs/commit/Pending.java | 181 ++++++++ .../apache/hadoop/fs/tosfs/commit/PendingSet.java | 123 +++++ .../apache/hadoop/fs/tosfs/commit/SuccessData.java | 238 ++++++++++ .../hadoop/fs/tosfs/commit/mapred/Committer.java | 184 ++++++++ .../hadoop/fs/tosfs/commit/ops/PendingOps.java | 43 ++ .../fs/tosfs/commit/ops/PendingOpsFactory.java | 40 ++ .../hadoop/fs/tosfs/commit/ops/RawPendingOps.java | 54 +++ .../org/apache/hadoop/fs/tosfs/conf/ConfKeys.java | 6 + .../org/apache/hadoop/fs/tosfs/util/JsonCodec.java | 46 ++ .../apache/hadoop/fs/tosfs/util/Serializer.java | 25 ++ 14 files changed, 1860 insertions(+) diff --git a/hadoop-cloud-storage-project/hadoop-tos/pom.xml b/hadoop-cloud-storage-project/hadoop-tos/pom.xml index 6d6bb49d1a4..53e80c374f8 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/pom.xml +++ b/hadoop-cloud-storage-project/hadoop-tos/pom.xml @@ -43,12 +43,29 @@ <artifactId>hadoop-common</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>test</scope> <type>test-jar</type> </dependency> + <!-- Artifacts needed to bring up a Mini MR Yarn cluster--> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-examples</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>com.volcengine</groupId> diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitContext.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitContext.java new file mode 100644 index 00000000000..1dc67bc27c3 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitContext.java @@ -0,0 +1,45 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; + +import java.util.List; + +public class CommitContext { + private final List<FileStatus> pendingSets; + // It will be accessed in multi-threads, please access it in a thread-safe context. + private final List<String> destKeys; + + public CommitContext(List<FileStatus> pendingSets) { + this.pendingSets = pendingSets; + this.destKeys = Lists.newArrayList(); + } + + public List<FileStatus> pendingSets() { + return pendingSets; + } + + public synchronized void addDestKey(String destKey) { + destKeys.add(destKey); + } + + public synchronized List<String> destKeys() { + return destKeys; + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java new file mode 100644 index 00000000000..2ea0277f544 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java @@ -0,0 +1,359 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.tosfs.commit.mapred.Committer; +import org.apache.hadoop.fs.tosfs.util.Serializer; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Supplier; + +public class CommitUtils { + private CommitUtils() { + } + + public static final String COMMITTER_NAME = Committer.class.getName(); + + /** + * Support scheme for tos committer. + */ + public static final String FS_STORAGE_OBJECT_SCHEME = "fs.object-storage.scheme"; + public static final String DEFAULT_FS_STORAGE_OBJECT_SCHEME = "tos,oss,s3,s3a,s3n,obs,filestore"; + + /** + * Path for "magic" writes: path and {@link #PENDING_SUFFIX} files: {@value}. + */ + public static final String MAGIC = "__magic"; + + /** + * Marker of the start of a directory tree for calculating the final path names: {@value}. + */ + public static final String BASE = "__base"; + + /** + * Suffix applied to pending commit metadata: {@value}. + */ + public static final String PENDING_SUFFIX = ".pending"; + + /** + * Suffix applied to multiple pending commit metadata: {@value}. + */ + public static final String PENDINGSET_SUFFIX = ".pendingset"; + + /** + * Marker file to create on success: {@value}. + */ + public static final String _SUCCESS = "_SUCCESS"; + + /** + * Format string used to build a summary file from a Job ID. + */ + public static final String SUMMARY_FILENAME_FORMAT = "summary-%s.json"; + + /** + * Extra Data key for task attempt in pendingset files. + */ + public static final String TASK_ATTEMPT_ID = "task.attempt.id"; + + /** + * The UUID for jobs: {@value}. + * This was historically created in Spark 1.x's SQL queries, see SPARK-33230. + */ + public static final String SPARK_WRITE_UUID = "spark.sql.sources.writeJobUUID"; + + /** + * Get the magic location for the output path. + * Format: ${out}/__magic + * + * @param out the base output directory. + * @return the location of magic job attempts. + */ + public static Path magicPath(Path out) { + return new Path(out, MAGIC); + } + + /** + * Compute the "magic" path for a job. <br> + * Format: ${jobOutput}/__magic/${jobId} + * + * @param jobId unique Job ID. + * @param jobOutput the final output directory. + * @return the path to store job attempt data. + */ + public static Path magicJobPath(String jobId, Path jobOutput) { + return new Path(magicPath(jobOutput), jobId); + } + + /** + * Get the Application Attempt ID for this job. + * + * @param context the context to look in + * @return the Application Attempt ID for a given job, or 0 + */ + public static int appAttemptId(JobContext context) { + return context.getConfiguration().getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0); + } + + /** + * Compute the "magic" path for a job attempt. <br> + * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId} + * + * @param jobId unique Job ID. + * @param appAttemptId the ID of the application attempt for this job. + * @param jobOutput the final output directory. + * @return the path to store job attempt data. + */ + public static Path magicJobAttemptPath(String jobId, int appAttemptId, Path jobOutput) { + return new Path(magicPath(jobOutput), formatAppAttemptDir(jobId, appAttemptId)); + } + + /** + * Compute the "magic" path for a job attempt. <br> + * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId} + * + * @param context the context of the job. + * @param jobOutput the final output directory. + * @return the path to store job attempt data. + */ + public static Path magicJobAttemptPath(JobContext context, Path jobOutput) { + String jobId = buildJobId(context); + return magicJobAttemptPath(jobId, appAttemptId(context), jobOutput); + } + + private static String formatAppAttemptDir(String jobId, int appAttemptId) { + return String.format("%s/%02d", jobId, appAttemptId); + } + + /** + * Compute the path where the output of magic task attempts are stored. <br> + * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks + * + * @param jobId unique Job ID. + * @param jobOutput The output path to commit work into. + * @param appAttemptId the ID of the application attempt for this job. + * @return the path where the output of magic task attempts are stored. + */ + public static Path magicTaskAttemptsPath(String jobId, Path jobOutput, int appAttemptId) { + return new Path(magicJobAttemptPath(jobId, appAttemptId, jobOutput), "tasks"); + } + + /** + * Compute the path where the output of a task attempt is stored until that task is committed. + * This path is marked as a base path for relocations, so subdirectory information is preserved. + * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks/${taskAttemptId}/__base + * + * @param context the context of the task attempt. + * @param jobId unique Job ID. + * @param jobOutput The output path to commit work into. + * @return the path where a task attempt should be stored. + */ + public static Path magicTaskAttemptBasePath(TaskAttemptContext context, String jobId, Path jobOutput) { + return new Path(magicTaskAttemptPath(context, jobId, jobOutput), BASE); + } + + /** + * Compute the path where the output of a task attempt is stored until that task is committed. + * This path is marked as a base path for relocations, so subdirectory information is preserved. + * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks/${taskAttemptId}/__base + * + * @param context the context of the task attempt. + * @param jobOutput The output path to commit work into. + * @return the path where a task attempt should be stored. + */ + public static Path magicTaskAttemptBasePath(TaskAttemptContext context, Path jobOutput) { + String jobId = buildJobId(context); + return magicTaskAttemptBasePath(context, jobId, jobOutput); + } + + /** + * Get the magic task attempt path, without any annotations to mark relative references. + * If there is an app attempt property in the context configuration, that is included. + * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks/${taskAttemptId} + * + * @param context the context of the task attempt. + * @param jobId unique Job ID. + * @param jobOutput The output path to commit work into. + * @return the path under which all attempts go. + */ + public static Path magicTaskAttemptPath(TaskAttemptContext context, String jobId, Path jobOutput) { + return new Path( + magicTaskAttemptsPath(jobId, jobOutput, appAttemptId(context)), + String.valueOf(context.getTaskAttemptID())); + } + + /** + * Get the magic task attempt path, without any annotations to mark relative references. + * If there is an app attempt property in the context configuration, that is included. + * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks/${taskAttemptId} + * + * @param context the context of the task attempt. + * @param jobOutput The output path to commit work into. + * @return the path under which all attempts go. + */ + public static Path magicTaskAttemptPath(TaskAttemptContext context, Path jobOutput) { + String jobId = buildJobId(context); + return magicTaskAttemptPath(context, jobId, jobOutput); + } + + /** + * Get the magic task pendingset path. + * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/${taskId}.pendingset + * + * @param context the context of the task attempt. + * @param jobOutput The output path to commit work into. + */ + public static Path magicTaskPendingSetPath(TaskAttemptContext context, Path jobOutput) { + String taskId = String.valueOf(context.getTaskAttemptID().getTaskID()); + return new Path(magicJobAttemptPath(context, jobOutput), String.format("%s%s", taskId, PENDINGSET_SUFFIX)); + } + + public static String buildJobId(Configuration conf, JobID jobId) { + String jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, ""); + if (!jobUUID.isEmpty()) { + if (jobUUID.startsWith(JobID.JOB)) { + return jobUUID; + } else { + return String.format("%s_%s", JobID.JOB, jobUUID); + } + } + + // if no other option was supplied, return the job ID. + // This is exactly what MR jobs expect, but is not what + // Spark jobs can do as there is a risk of jobID collision. + return jobId != null ? jobId.toString() : "NULL_JOB_ID"; + } + + public static String buildJobId(JobContext context) { + return buildJobId(context.getConfiguration(), context.getJobID()); + } + + /** + * Get a job name; returns meaningful text if there is no name. + * + * @param context job context + * @return a string for logs + */ + public static String jobName(JobContext context) { + String name = context.getJobName(); + return (name != null && !name.isEmpty()) ? name : "(anonymous)"; + } + + /** + * Format: ${output}/_SUCCESS + */ + public static Path successMarker(Path output) { + return new Path(output, _SUCCESS); + } + + /** + * Format: ${reportDir}/summary-xxxxx.json + */ + public static Path summaryReport(Path reportDir, String jobId) { + return new Path(reportDir, String.format(SUMMARY_FILENAME_FORMAT, jobId)); + } + + public static void save(FileSystem fs, Path path, byte[] data) throws IOException { + // By default, fs.create(path) will create parent folder recursively, and overwrite + // it if it's already exist. + try (FSDataOutputStream out = fs.create(path)) { + IOUtils.copy(new ByteArrayInputStream(data), out); + } + } + + public static void save(FileSystem fs, Path path, Serializer instance) throws IOException { + save(fs, path, instance.serialize()); + } + + public static byte[] load(FileSystem fs, Path path) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (FSDataInputStream in = fs.open(path)) { + IOUtils.copy(in, out); + } + return out.toByteArray(); + } + + public static List<FileStatus> listPendingFiles(FileSystem fs, Path dir) throws IOException { + List<FileStatus> pendingFiles = Lists.newArrayList(); + CommitUtils.listFiles(fs, dir, true, f -> { + if (f.getPath().toString().endsWith(CommitUtils.PENDING_SUFFIX)) { + pendingFiles.add(f); + } + }); + return pendingFiles; + } + + public static void listFiles(FileSystem fs, Path dir, boolean recursive, FileVisitor visitor) throws IOException { + RemoteIterator<LocatedFileStatus> iter = fs.listFiles(dir, recursive); + while (iter.hasNext()) { + FileStatus f = iter.next(); + visitor.visit(f); + } + } + + public interface FileVisitor { + void visit(FileStatus f); + } + + public static boolean supportProtonCommit(Configuration conf, Path outputPath) { + return supportSchemes(conf).contains(outputPath.toUri().getScheme()); + } + + private static List<String> supportSchemes(Configuration conf) { + String schemes = conf.get(FS_STORAGE_OBJECT_SCHEME, DEFAULT_FS_STORAGE_OBJECT_SCHEME); + Preconditions.checkNotNull(schemes, "%s cannot be null", FS_STORAGE_OBJECT_SCHEME); + return Arrays.asList(schemes.split(",")); + } + + private static Set<String> errorStage = new HashSet<>(); + private static boolean testMode = false; + + public static void injectError(String stage) { + errorStage.add(stage); + testMode = true; + } + + public static void removeError(String stage) { + errorStage.remove(stage); + } + + public static <T extends Exception> void triggerError(Supplier<T> error, String stage) throws T { + if (testMode && errorStage.contains(stage)) { + throw error.get(); + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/Committer.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/Committer.java new file mode 100644 index 00000000000..6180447ed34 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/Committer.java @@ -0,0 +1,499 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.commit.ops.PendingOps; +import org.apache.hadoop.fs.tosfs.commit.ops.PendingOpsFactory; +import org.apache.hadoop.fs.tosfs.common.Tasks; +import org.apache.hadoop.fs.tosfs.common.ThreadPools; +import org.apache.hadoop.fs.tosfs.object.MultipartUpload; +import org.apache.hadoop.fs.tosfs.object.ObjectStorage; +import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory; +import org.apache.hadoop.fs.tosfs.object.ObjectUtils; +import org.apache.hadoop.fs.tosfs.util.CommonUtils; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutorService; + +public class Committer extends PathOutputCommitter { + private static final Logger LOG = LoggerFactory.getLogger(Committer.class); + + public static final String COMMITTER_THREADS = "fs.job.committer.threads"; + public static final String COMMITTER_SUMMARY_REPORT_DIR = "fs.job.committer.summary.report.directory"; + public static final int DEFAULT_COMMITTER_THREADS = Runtime.getRuntime().availableProcessors(); + public static final String THREADS_PREFIX = "job-committer-thread-pool"; + + private final String jobId; + private final Path outputPath; + // This is the directory for all intermediate work, where the output format will write data. + // This may not be on the final file system + private Path workPath; + private final String role; + private final Configuration conf; + private final FileSystem destFs; + private final ObjectStorage storage; + private final PendingOps ops; + + public Committer(Path outputPath, TaskAttemptContext context) throws IOException { + this(outputPath, context, String.format("Task committer %s", context.getTaskAttemptID())); + this.workPath = CommitUtils.magicTaskAttemptBasePath(context, outputPath); + LOG.info("Task attempt {} has work path {}", context.getTaskAttemptID(), getWorkPath()); + } + + public Committer(Path outputPath, JobContext context) throws IOException { + this(outputPath, context, String.format("Job committer %s", context.getJobID())); + } + + private Committer(Path outputPath, JobContext context, String role) throws IOException { + super(outputPath, context); + this.jobId = CommitUtils.buildJobId(context); + this.outputPath = outputPath; + this.role = role; + this.conf = context.getConfiguration(); + this.destFs = outputPath.getFileSystem(conf); + LOG.info("{} instantiated for job '{}' ID {} with destination {}", + role, + CommitUtils.jobName(context), + jobId, outputPath); + // Initialize the object storage. + this.storage = ObjectStorageFactory.create(outputPath.toUri().getScheme(), outputPath.toUri().getAuthority(), conf); + this.ops = PendingOpsFactory.create(destFs, storage); + } + + @Override + public Path getOutputPath() { + return outputPath; + } + + @Override + public Path getWorkPath() { + return workPath; + } + + @Override + public void setupJob(JobContext context) throws IOException { + checkJobId(context); + LOG.info("Setup Job {}", jobId); + Path jobOutput = getOutputPath(); + + // delete the success marker if exists. + destFs.delete(CommitUtils.successMarker(jobOutput), false); + + // create the destination directory. + destFs.mkdirs(jobOutput); + + logUncompletedMPUIfPresent(jobOutput); + + // Reset the job path, and create the job path with job attempt sub path. + Path jobPath = CommitUtils.magicJobPath(jobId, outputPath); + Path jobAttemptPath = CommitUtils.magicJobAttemptPath(context, outputPath); + destFs.delete(jobPath, true); + destFs.mkdirs(jobAttemptPath); + } + + private void logUncompletedMPUIfPresent(Path jobOutput) { + // do a scan and add warn log message for active uploads. + int nums = 0; + for (MultipartUpload upload : storage.listUploads(ObjectUtils.pathToKey(jobOutput, true))) { + if (nums++ > 10) { + LOG.warn("There are more than 10 uncompleted multipart uploads under path {}.", jobOutput); + break; + } + LOG.warn("Uncompleted multipart upload {} is under path {}, either jobs are running concurrently " + + "or failed jobs are not being cleaned up.", upload, jobOutput); + } + } + + @Override + public void commitJob(JobContext context) throws IOException { + checkJobId(context); + LOG.info("{}: committing job {}", role, jobId); + String stage = null; + Exception failure = null; + SuccessData successData = null; + + ExecutorService threadPool = ThreadPools.newWorkerPool(THREADS_PREFIX, commitThreads()); + List<FileStatus> pendingSets = Lists.newArrayList(); + try { + // Step.1 List active pending commits. + stage = "preparing"; + CommitUtils.listFiles(destFs, CommitUtils.magicJobAttemptPath(context, outputPath), true, f -> { + if (f.getPath().toString().endsWith(CommitUtils.PENDINGSET_SUFFIX)) { + pendingSets.add(f); + } + }); + + // Step.2 Load and commit those active pending commits. + stage = "commit"; + CommitContext commitCtxt = new CommitContext(pendingSets); + loadAndCommitPendingSets(threadPool, commitCtxt); + + // Step.3 Save the success marker. + stage = "marker"; + successData = createSuccessData(commitCtxt.destKeys()); + CommitUtils.triggerError(() -> new IOException("Mock error of success marker."), stage); + CommitUtils.save(destFs, CommitUtils.successMarker(outputPath), successData); + + // Step.4 Abort those orphan multipart uploads and cleanup the staging dir. + stage = "clean"; + cleanup(threadPool, true); + } catch (Exception e) { + failure = e; + LOG.warn("Commit failure for job {} stage {}", CommitUtils.buildJobId(context), stage, e); + + // Revert all pending sets when marker step fails. + if (stage.equals("marker")) { + CommonUtils.runQuietly( + () -> loadAndRevertPendingSets(threadPool, new CommitContext(pendingSets))); + } + CommonUtils.runQuietly(() -> cleanup(threadPool, true)); + throw e; + } finally { + saveSummaryReportQuietly(stage, context, successData, failure); + CommonUtils.runQuietly(threadPool::shutdown); + + cleanupResources(); + } + } + + private SuccessData createSuccessData(Iterable<String> filenames) { + SuccessData data = SuccessData.builder() + .setName(SuccessData.class.getName()) + .setCommitter(CommitUtils.COMMITTER_NAME) + .setTimestamp(System.currentTimeMillis()) + .setHostname(NetUtils.getHostname()) + .setDescription(role) + .setJobId(jobId) + .addFileNames(filenames) + .build(); + + data.addDiagnosticInfo(COMMITTER_THREADS, Integer.toString(commitThreads())); + return data; + } + + private void saveSummaryReportQuietly(String activeStage, JobContext context, SuccessData report, Throwable thrown) { + Configuration jobConf = context.getConfiguration(); + String reportDir = jobConf.get(COMMITTER_SUMMARY_REPORT_DIR, ""); + if (reportDir.isEmpty()) { + LOG.debug("Summary directory conf: {} is not set", COMMITTER_SUMMARY_REPORT_DIR); + return; + } + + Path path = CommitUtils.summaryReport(new Path(reportDir), jobId); + LOG.debug("Summary report path is {}", path); + + try { + if (report == null) { + report = createSuccessData(null); + } + if (thrown != null) { + report.recordJobFailure(thrown); + } + report.addDiagnosticInfo("stage", activeStage); + + CommitUtils.save(path.getFileSystem(jobConf), path, report); + LOG.info("Job summary saved to {}", path); + } catch (Exception e) { + LOG.warn("Failed to save summary to {}", path, e); + } + } + + private void loadAndCommitPendingSets(ExecutorService outerPool, CommitContext commitContext) { + ExecutorService innerPool = ThreadPools.newWorkerPool("commit-pending-files-pool", commitThreads()); + try { + Tasks.foreach(commitContext.pendingSets()) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(outerPool) + .abortWith(pendingSet -> loadAndAbort(innerPool, pendingSet)) + .revertWith(pendingSet -> loadAndRevert(innerPool, pendingSet)) + .run(pendingSet -> loadAndCommit(commitContext, innerPool, pendingSet)); + } finally { + CommonUtils.runQuietly(innerPool::shutdown); + } + } + + private void loadAndRevertPendingSets(ExecutorService outerPool, CommitContext commitContext) { + Tasks.foreach(commitContext.pendingSets()) + .throwFailureWhenFinished() + .executeWith(outerPool) + .run(pendingSet -> loadAndRevert(outerPool, pendingSet)); + } + + /** + * Load {@link PendingSet} from file and abort those {@link Pending} commits. + */ + private void loadAndAbort(ExecutorService pool, FileStatus pendingSetFile) { + PendingSet pendingSet = PendingSet.deserialize(destFs, pendingSetFile); + Tasks.foreach(pendingSet.commits()) + .suppressFailureWhenFinished() + .executeWith(pool) + .run(ops::abort); + } + + /** + * Load {@link PendingSet} from file and revert those {@link Pending} commits. + */ + private void loadAndRevert(ExecutorService pool, FileStatus pendingSetFile) { + PendingSet pendingSet = PendingSet.deserialize(destFs, pendingSetFile); + Tasks.foreach(pendingSet.commits()) + .suppressFailureWhenFinished() + .executeWith(pool) + .run(ops::revert); + } + + /** + * Load {@link PendingSet} from file and commit those {@link Pending} commits. + */ + private void loadAndCommit(CommitContext commitCtxt, ExecutorService pool, FileStatus pendingSetFile) { + PendingSet pendingSet = PendingSet.deserialize(destFs, pendingSetFile); + // Verify that whether the job id is matched. + String jobId = pendingSet.jobId(); + if (!StringUtils.isNoneEmpty(jobId) && !Objects.equals(jobId, jobId())) { + throw new IllegalStateException(String.format("Mismatch in Job ID (%s) and commit job ID (%s)", jobId(), jobId)); + } + + Tasks.foreach(pendingSet.commits()) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(pool) + .onFailure((pending, exception) -> ops.abort(pending)) + .abortWith(ops::abort) + .revertWith(ops::revert) + .run(pending -> { + ops.commit(pending); + commitCtxt.addDestKey(pending.destKey()); + }); + } + + @Override + public void abortJob(JobContext context, JobStatus.State state) { + checkJobId(context); + LOG.info("{}: aborting job {} in state {}", role, jobId, state); + ExecutorService service = ThreadPools.newWorkerPool(THREADS_PREFIX, commitThreads()); + try { + cleanup(service, false); + } finally { + service.shutdown(); + + cleanupResources(); + } + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + checkJobId(context); + LOG.info("Setup Task {}", context.getTaskAttemptID()); + Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(context, outputPath); + // Delete the task attempt path if somehow it was there. + destFs.delete(taskAttemptBasePath, true); + // Make an empty directory. + destFs.mkdirs(taskAttemptBasePath); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return true; + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + checkJobId(context); + LOG.info("Commit task {}", context); + ExecutorService pool = ThreadPools.newWorkerPool(THREADS_PREFIX, commitThreads()); + try { + PendingSet commits = innerCommitTask(pool, context); + LOG.info("Task {} committed {} files", context.getTaskAttemptID(), commits.size()); + } catch (IOException e) { + LOG.error("Failed to commit task {}", context.getTaskAttemptID(), e); + throw e; + } finally { + // Shutdown the thread pool quietly. + CommonUtils.runQuietly(pool::shutdown); + + // Delete the task attempt path quietly. + Path taskAttemptPath = CommitUtils.magicTaskAttemptPath(context, outputPath); + LOG.info("Delete task attempt path {}", taskAttemptPath); + CommonUtils.runQuietly(() -> destFs.delete(taskAttemptPath, true)); + } + } + + private PendingSet innerCommitTask(ExecutorService pool, TaskAttemptContext context) throws IOException { + Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(context, outputPath); + PendingSet pendingSet = new PendingSet(jobId); + try { + // Load the pending files and fill them into the pending set. + List<FileStatus> pendingFiles = CommitUtils.listPendingFiles(destFs, taskAttemptBasePath); + // Use the thread-safe collection to collect the pending list. + List<Pending> pendings = Collections.synchronizedList(Lists.newArrayList()); + Tasks.foreach(pendingFiles) + .throwFailureWhenFinished() + .executeWith(pool) + .run(f -> { + try { + byte[] data = CommitUtils.load(destFs, f.getPath()); + pendings.add(Pending.deserialize(data)); + } catch (IOException e) { + LOG.warn("Failed to load .pending file {}", f.getPath(), e); + throw new UncheckedIOException(e); + } + }); + pendingSet.addAll(pendings); + + // Add the extra task attempt id property. + String taskId = String.valueOf(context.getTaskAttemptID()); + pendingSet.addExtraData(CommitUtils.TASK_ATTEMPT_ID, taskId); + + // Save the pending set to file system. + Path taskOutput = CommitUtils.magicTaskPendingSetPath(context, outputPath); + LOG.info("Saving work of {} to {}", taskId, taskOutput); + CommitUtils.save(destFs, taskOutput, pendingSet.serialize()); + + } catch (Exception e) { + LOG.error("Encounter error when loading pending set from {}", taskAttemptBasePath, e); + if (!pendingSet.commits().isEmpty()) { + Tasks.foreach(pendingSet.commits()) + .executeWith(pool) + .suppressFailureWhenFinished() + .run(ops::abort); + } + throw e; + } + + return pendingSet; + } + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + checkJobId(context); + Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(context, outputPath); + try { + // Load the pending files from the underlying filesystem. + List<FileStatus> pendingFiles = CommitUtils.listPendingFiles(destFs, taskAttemptBasePath); + Tasks.foreach(pendingFiles) + .throwFailureWhenFinished() + .run(f -> { + try { + byte[] serializedData = CommitUtils.load(destFs, f.getPath()); + ops.abort(Pending.deserialize(serializedData)); + } catch (FileNotFoundException e) { + LOG.debug("Listed file already deleted: {}", f); + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + final FileStatus pendingFile = f; + CommonUtils.runQuietly(() -> destFs.delete(pendingFile.getPath(), false)); + } + }); + } finally { + CommonUtils.runQuietly(() -> destFs.delete(taskAttemptBasePath, true)); + } + } + + @Override + public void recoverTask(TaskAttemptContext context) { + checkJobId(context); + String taskId = context.getTaskAttemptID().toString(); + throw new UnsupportedOperationException(String.format("Unable to recover task %s, output: %s", taskId, outputPath)); + } + + private int commitThreads() { + return conf.getInt(COMMITTER_THREADS, DEFAULT_COMMITTER_THREADS); + } + + private void cleanup(ExecutorService pool, boolean suppress) { + LOG.info("Cleanup the job by abort the multipart uploads and clean staging dir, suppress {}", suppress); + try { + Path jobOutput = getOutputPath(); + Iterable<MultipartUpload> pending = + storage.listUploads(ObjectUtils.pathToKey(CommitUtils.magicJobPath(jobId, jobOutput), true)); + Tasks.foreach(pending) + .executeWith(pool) + .suppressFailureWhenFinished() + .run(u -> storage.abortMultipartUpload(u.key(), u.uploadId())); + } catch (Exception e) { + if (suppress) { + LOG.error("The following exception has been suppressed when cleanup job", e); + } else { + throw e; + } + } finally { + CommonUtils.runQuietly(this::cleanupStagingDir); + } + } + + private void cleanupStagingDir() throws IOException { + // Note: different jobs share the same __magic folder, like, + // tos://bucket/path/to/table/__magic/job-A/..., and + // tos://bucket/path/to/table/__magic/job-B/... + // Job should only delete its own job folder to avoid the failure of other jobs, + // and, folder __magic should be deleted by the last job. + // This design does not assure the security of two jobs that one job founds there + // isn't another job be running, however, when it is deleting __magic but another + // job will visit it at the same time. We think the probability is low and we don't + // deal with it. + destFs.delete(CommitUtils.magicJobPath(jobId, outputPath), true); + Path magicPath = CommitUtils.magicPath(outputPath); + if (destFs.listStatus(magicPath).length == 0) { + destFs.delete(magicPath, true); + } + } + + public String jobId() { + return jobId; + } + + private void checkJobId(JobContext context) { + String jobIdInContext = CommitUtils.buildJobId(context); + Preconditions.checkArgument(Objects.equals(jobId, jobIdInContext), + String.format("JobId set in the context: %s is not consistent with the initial jobId of the committer: %s, " + + "please check you settings in your taskAttemptContext.", jobIdInContext, jobId)); + } + + private void cleanupResources() { + CommonUtils.runQuietly(storage::close); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("role", role) + .add("jobId", jobId) + .add("outputPath", outputPath) + .toString(); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/Pending.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/Pending.java new file mode 100644 index 00000000000..5218912ed66 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/Pending.java @@ -0,0 +1,181 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.tosfs.object.Part; +import org.apache.hadoop.fs.tosfs.util.JsonCodec; +import org.apache.hadoop.fs.tosfs.util.Serializer; +import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * Metadata that will be serialized as json and be saved in the .pending files. + */ +public class Pending implements Serializer { + private static final JsonCodec<Pending> CODEC = new JsonCodec<>(Pending.class); + + private String bucket; + private String destKey; + private String uploadId; + private long length; + private long createdTimestamp; + private List<Part> parts; + + // No-arg constructor for json serializer, don't use. + public Pending() { + } + + public Pending( + String bucket, String destKey, + String uploadId, long length, + long createdTimestamp, List<Part> parts) { + this.bucket = bucket; + this.destKey = destKey; + this.uploadId = uploadId; + this.length = length; + this.createdTimestamp = createdTimestamp; + this.parts = parts; + } + + public String bucket() { + return bucket; + } + + public String destKey() { + return destKey; + } + + public String uploadId() { + return uploadId; + } + + public long length() { + return length; + } + + public long createdTimestamp() { + return createdTimestamp; + } + + public List<Part> parts() { + return parts; + } + + @Override + public byte[] serialize() throws IOException { + return CODEC.toBytes(this); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("bucket", bucket) + .add("destKey", destKey) + .add("uploadId", uploadId) + .add("length", length) + .add("createdTimestamp", createdTimestamp) + .add("uploadParts", StringUtils.join(parts, ",")) + .toString(); + } + + public static Pending deserialize(byte[] data) throws IOException { + return CODEC.fromBytes(data); + } + + @Override + public int hashCode() { + return Objects.hash(bucket, destKey, uploadId, length, createdTimestamp, parts); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof Pending)) { + return false; + } + Pending that = (Pending) o; + return Objects.equals(bucket, that.bucket) + && Objects.equals(destKey, that.destKey) + && Objects.equals(uploadId, that.uploadId) + && Objects.equals(length, that.length) + && Objects.equals(createdTimestamp, that.createdTimestamp) + && Objects.equals(parts, that.parts); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String bucket; + private String destKey; + private String uploadId; + private long length; + private long createdTimestamp; + private final List<Part> parts = Lists.newArrayList(); + + public Builder setBucket(String bucket) { + this.bucket = bucket; + return this; + } + + public Builder setDestKey(String destKey) { + this.destKey = destKey; + return this; + } + + public Builder setUploadId(String uploadId) { + this.uploadId = uploadId; + return this; + } + + public Builder setLength(long length) { + this.length = length; + return this; + } + + public Builder setCreatedTimestamp(long createdTimestamp) { + this.createdTimestamp = createdTimestamp; + return this; + } + + public Builder addParts(List<Part> parts) { + this.parts.addAll(parts); + return this; + } + + public Pending build() { + Preconditions.checkArgument(StringUtils.isNoneEmpty(bucket), "Empty bucket"); + Preconditions.checkArgument(StringUtils.isNoneEmpty(destKey), "Empty object destination key"); + Preconditions.checkArgument(StringUtils.isNoneEmpty(uploadId), "Empty uploadId"); + Preconditions.checkArgument(length >= 0, "Invalid length: %s", length); + parts.forEach(part -> Preconditions.checkArgument(StringUtils.isNoneEmpty(part.eTag(), "Empty etag"))); + + return new Pending( + bucket, destKey, + uploadId, length, + createdTimestamp, parts); + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/PendingSet.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/PendingSet.java new file mode 100644 index 00000000000..a6ca93d4d3e --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/PendingSet.java @@ -0,0 +1,123 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.tosfs.util.JsonCodec; +import org.apache.hadoop.fs.tosfs.util.Serializer; +import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; +import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class PendingSet implements Serializer { + private static final JsonCodec<PendingSet> CODEC = new JsonCodec<>(PendingSet.class); + + private String jobId; + private List<Pending> pendings; + private Map<String, String> extraData; + + // No-arg constructor for json serializer, don't use. + public PendingSet() { + } + + public PendingSet(String jobId) { + this(jobId, Lists.newArrayList()); + } + + public PendingSet(String jobId, List<Pending> pendings) { + this.jobId = jobId; + this.pendings = Lists.newArrayList(pendings); + this.extraData = Maps.newHashMap(); + } + + public PendingSet addAll(Iterable<Pending> items) { + Iterables.addAll(pendings, items); + return this; + } + + public PendingSet add(Pending pending) { + pendings.add(pending); + return this; + } + + public PendingSet addExtraData(String key, String val) { + extraData.put(key, val); + return this; + } + + public String jobId() { + return jobId; + } + + public List<Pending> commits() { + return pendings; + } + + public Map<String, String> extraData() { + return extraData; + } + + public int size() { + return pendings.size(); + } + + @Override + public byte[] serialize() throws IOException { + return CODEC.toBytes(this); + } + + public static PendingSet deserialize(byte[] data) { + try { + return CODEC.fromBytes(data); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static PendingSet deserialize(FileSystem fs, FileStatus f) { + try { + return deserialize(CommitUtils.load(fs, f.getPath())); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public int hashCode() { + return Objects.hash(jobId, pendings, extraData); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof PendingSet)) { + return false; + } + PendingSet that = (PendingSet) o; + return Objects.equals(jobId, that.jobId) + && Objects.equals(pendings, that.pendings) + && Objects.equals(extraData, that.extraData); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/SuccessData.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/SuccessData.java new file mode 100644 index 00000000000..199d084f5a7 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/SuccessData.java @@ -0,0 +1,238 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit; + +import org.apache.hadoop.fs.tosfs.util.JsonCodec; +import org.apache.hadoop.fs.tosfs.util.Serializer; +import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects; +import org.apache.hadoop.thirdparty.com.google.common.base.Throwables; +import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; +import org.apache.hadoop.util.StringUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class SuccessData implements Serializer { + private static final JsonCodec<SuccessData> CODEC = new JsonCodec<>(SuccessData.class); + + private String name; + private boolean success = true; + private long timestamp; + private String date; + private String hostname; + private String committer; + private String description; + private String jobId; + // Filenames in the commit. + private final List<String> filenames = new ArrayList<>(); + + // Diagnostics information. + private final Map<String, String> diagnostics = new HashMap<>(); + + // No-arg constructor for json serializer, Don't use. + public SuccessData() { + } + + public SuccessData( + String name, boolean success, long timestamp, + String date, String hostname, String committer, + String description, String jobId, List<String> filenames) { + this.name = name; + this.success = success; + this.timestamp = timestamp; + this.date = date; + this.hostname = hostname; + this.committer = committer; + this.description = description; + this.jobId = jobId; + this.filenames.addAll(filenames); + } + + public String name() { + return name; + } + + public boolean success() { + return success; + } + + public long timestamp() { + return timestamp; + } + + public String date() { + return date; + } + + public String hostname() { + return hostname; + } + + public String committer() { + return committer; + } + + public String description() { + return description; + } + + public String jobId() { + return jobId; + } + + public Map<String, String> diagnostics() { + return diagnostics; + } + + public List<String> filenames() { + return filenames; + } + + public void recordJobFailure(Throwable thrown) { + this.success = false; + String stacktrace = Throwables.getStackTraceAsString(thrown); + addDiagnosticInfo("exception", thrown.toString()); + addDiagnosticInfo("stacktrace", stacktrace); + } + + public void addDiagnosticInfo(String key, String value) { + diagnostics.put(key, value); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("name", name) + .add("success", success) + .add("timestamp", timestamp) + .add("date", date) + .add("hostname", hostname) + .add("committer", committer) + .add("description", description) + .add("jobId", jobId) + .add("filenames", StringUtils.join(",", filenames)) + .toString(); + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public byte[] serialize() throws IOException { + return CODEC.toBytes(this); + } + + public static SuccessData deserialize(byte[] data) throws IOException { + return CODEC.fromBytes(data); + } + + @Override + public int hashCode() { + return Objects.hash(name, success, timestamp, date, hostname, committer, description, jobId, filenames); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof SuccessData)) { + return false; + } + SuccessData that = (SuccessData) o; + return Objects.equals(name, that.name) + && Objects.equals(success, that.success) + && Objects.equals(timestamp, that.timestamp) + && Objects.equals(date, that.date) + && Objects.equals(hostname, that.hostname) + && Objects.equals(committer, that.committer) + && Objects.equals(description, that.description) + && Objects.equals(jobId, that.jobId) + && Objects.equals(filenames, that.filenames); + } + + public static class Builder { + private String name = SuccessData.class.getName(); + private boolean success = true; + private long timestamp; + private String date; + private String hostname; + private String committer; + private String description; + private String jobId; + private final List<String> filenames = Lists.newArrayList(); + + public Builder setName(String name) { + this.name = name; + return this; + } + + public Builder setSuccess(boolean success) { + this.success = success; + return this; + } + + public Builder setTimestamp(long timestamp) { + this.timestamp = timestamp; + return this; + } + + public Builder setDate(String date) { + this.date = date; + return this; + } + + public Builder setHostname(String hostname) { + this.hostname = hostname; + return this; + } + + public Builder setCommitter(String committer) { + this.committer = committer; + return this; + } + + public Builder setDescription(String description) { + this.description = description; + return this; + } + + public Builder setJobId(String jobId) { + this.jobId = jobId; + return this; + } + + public Builder addFileNames(Iterable<String> newFileNames) { + if (newFileNames != null) { + Iterables.addAll(this.filenames, newFileNames); + } + return this; + } + + public SuccessData build() { + return new SuccessData( + name, success, timestamp, + date, hostname, committer, + description, jobId, filenames); + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java new file mode 100644 index 00000000000..9a2603942fd --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java @@ -0,0 +1,184 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit.mapred; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.commit.CommitUtils; +import org.apache.hadoop.mapred.FileOutputCommitter; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class Committer extends FileOutputCommitter { + private static final Logger LOG = LoggerFactory.getLogger(Committer.class); + private org.apache.hadoop.mapreduce.OutputCommitter wrapped = null; + + private static Path getOutputPath(JobContext context) { + JobConf conf = context.getJobConf(); + return FileOutputFormat.getOutputPath(conf); + } + + private static Path getOutputPath(TaskAttemptContext context) { + JobConf conf = context.getJobConf(); + return FileOutputFormat.getOutputPath(conf); + } + + private org.apache.hadoop.mapreduce.OutputCommitter getWrapped(JobContext context) throws IOException { + if(wrapped == null) { + wrapped = CommitUtils.supportProtonCommit(context.getConfiguration(), getOutputPath(context)) + ? new org.apache.hadoop.fs.tosfs.commit.Committer(getOutputPath(context), context) + : new org.apache.hadoop.mapred.FileOutputCommitter(); + LOG.debug("Using OutputCommitter implementation {}", wrapped.getClass().getName()); + } + return wrapped; + } + + @InterfaceAudience.Private + @Override + public Path getTaskAttemptPath(TaskAttemptContext context) throws IOException { + Path out = getOutputPath(context); + return out == null ? null : getTaskAttemptPath(context, out); + } + + private org.apache.hadoop.mapreduce.OutputCommitter getWrapped(TaskAttemptContext context) throws IOException { + if(wrapped == null) { + wrapped = CommitUtils.supportProtonCommit(context.getConfiguration(), getOutputPath(context)) + ? new org.apache.hadoop.fs.tosfs.commit.Committer(getOutputPath(context), context) + : new org.apache.hadoop.mapred.FileOutputCommitter(); + } + return wrapped; + } + + @Override + public Path getWorkPath(TaskAttemptContext context, Path outputPath) + throws IOException { + if (getWrapped(context) instanceof org.apache.hadoop.fs.tosfs.commit.Committer) { + return ((org.apache.hadoop.fs.tosfs.commit.Committer) getWrapped(context)).getWorkPath(); + } + return super.getWorkPath(context, outputPath); + } + + private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException { + Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf()); + if(workPath == null && out != null) { + if (getWrapped(context) instanceof org.apache.hadoop.fs.tosfs.commit.Committer) { + return CommitUtils.magicTaskAttemptPath(context, getOutputPath(context)); + } else { + return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + .getTaskAttemptPath(context, out); + } + } + return workPath; + } + + @Override + public void setupJob(JobContext context) throws IOException { + getWrapped(context).setupJob(context); + } + + @Override + public void commitJob(JobContext context) throws IOException { + getWrapped(context).commitJob(context); + } + + @Override + @Deprecated + public void cleanupJob(JobContext context) throws IOException { + getWrapped(context).cleanupJob(context); + } + + @Override + public void abortJob(JobContext context, int runState) + throws IOException { + JobStatus.State state; + if(runState == JobStatus.State.RUNNING.getValue()) { + state = JobStatus.State.RUNNING; + } else if(runState == JobStatus.State.SUCCEEDED.getValue()) { + state = JobStatus.State.SUCCEEDED; + } else if(runState == JobStatus.State.FAILED.getValue()) { + state = JobStatus.State.FAILED; + } else if(runState == JobStatus.State.PREP.getValue()) { + state = JobStatus.State.PREP; + } else if(runState == JobStatus.State.KILLED.getValue()) { + state = JobStatus.State.KILLED; + } else { + throw new IllegalArgumentException(runState+" is not a valid runState."); + } + getWrapped(context).abortJob(context, state); + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + getWrapped(context).setupTask(context); + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + getWrapped(context).commitTask(context); + } + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + getWrapped(context).abortTask(context); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) + throws IOException { + return getWrapped(context).needsTaskCommit(context); + } + + @Override + @Deprecated + public boolean isRecoverySupported() { + return false; + } + + @Override + public boolean isCommitJobRepeatable(JobContext context) throws IOException { + return getWrapped(context).isCommitJobRepeatable(context); + } + + @Override + public boolean isRecoverySupported(JobContext context) throws IOException { + return getWrapped(context).isRecoverySupported(context); + } + + @Override + public void recoverTask(TaskAttemptContext context) + throws IOException { + getWrapped(context).recoverTask(context); + } + + public String jobId() { + Preconditions.checkNotNull(wrapped, "Encountered uninitialized job committer."); + return wrapped instanceof org.apache.hadoop.fs.tosfs.commit.Committer ? ((org.apache.hadoop.fs.tosfs.commit.Committer) wrapped).jobId() : null; + } + + public Path getWorkPath() { + Preconditions.checkNotNull(wrapped, "Encountered uninitialized job committer."); + return wrapped instanceof org.apache.hadoop.fs.tosfs.commit.Committer ? ((org.apache.hadoop.fs.tosfs.commit.Committer) wrapped).getWorkPath() : null; + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/PendingOps.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/PendingOps.java new file mode 100644 index 00000000000..09dffdeb728 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/PendingOps.java @@ -0,0 +1,43 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit.ops; + +import org.apache.hadoop.fs.tosfs.commit.Pending; + +public interface PendingOps { + /** + * Revert the committed {@link Pending}, usually we need to remove or delete the committed files. + * + * @param commit to revert. + */ + void revert(Pending commit); + + /** + * Abort the uncommitted {@link Pending}, to prevent any further committing. + * + * @param commit to abort. + */ + void abort(Pending commit); + + /** + * Commit the {@link Pending} files to be visible. If we want to revert this completed result, please just use + * {@link PendingOps#revert(Pending)} to revert this commit. + * + * @param commit to be visible. + */ + void commit(Pending commit); +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/PendingOpsFactory.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/PendingOpsFactory.java new file mode 100644 index 00000000000..e84cb62837c --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/PendingOpsFactory.java @@ -0,0 +1,40 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit.ops; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.tosfs.object.ObjectStorage; + +public class PendingOpsFactory { + public static final String PENDING_OPS_IMPL = "pending.ops.impl"; + public static final String DEFAULT_PENDING_OPS_IMPL = RawPendingOps.class.getName(); + + private PendingOpsFactory() { + } + + public static PendingOps create(FileSystem fs, ObjectStorage storage) { + try { + String opsImpl = fs.getConf().get(PENDING_OPS_IMPL, DEFAULT_PENDING_OPS_IMPL); + Class<?> clazz = Class.forName(opsImpl); + return (PendingOps) clazz + .getDeclaredConstructor(FileSystem.class, ObjectStorage.class) + .newInstance(fs, storage); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/RawPendingOps.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/RawPendingOps.java new file mode 100644 index 00000000000..74fec4ec0a5 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/RawPendingOps.java @@ -0,0 +1,54 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.commit.ops; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.tosfs.commit.Pending; +import org.apache.hadoop.fs.tosfs.object.ObjectStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * PendingOps will revert, abort or commit the given {@link Pending} commit. + */ +public class RawPendingOps implements PendingOps { + private static final Logger LOG = LoggerFactory.getLogger(RawPendingOps.class); + + private final ObjectStorage storage; + + /** + * Constructor for {@link PendingOpsFactory} to reflect a new instance. + */ + public RawPendingOps(FileSystem fs, ObjectStorage storage) { + this.storage = storage; + } + + public void revert(Pending commit) { + LOG.info("Revert the commit by deleting the object key - {}", commit); + storage.delete(commit.destKey()); + } + + public void abort(Pending commit) { + LOG.info("Abort the commit by aborting multipart upload - {}", commit); + storage.abortMultipartUpload(commit.destKey(), commit.uploadId()); + } + + public void commit(Pending commit) { + LOG.info("Commit by completing the multipart uploads - {}", commit); + storage.completeUpload(commit.destKey(), commit.uploadId(), commit.parts()); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java index 8534fe5ef74..b607c69bf67 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java @@ -84,4 +84,10 @@ public class ConfKeys { */ public static final String OBJECT_RENAME_ENABLED = "fs.tos.rename.enabled"; public static final boolean OBJECT_RENAME_ENABLED_DEFAULT = false; + + /** + * The range size when open object storage input stream. Value must be positive. + */ + public static final String OBJECT_STREAM_RANGE_SIZE = "proton.objectstorage.stream.range-size"; + public static final long OBJECT_STREAM_RANGE_SIZE_DEFAULT = Long.MAX_VALUE; } diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/JsonCodec.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/JsonCodec.java new file mode 100644 index 00000000000..5b2d7371c73 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/JsonCodec.java @@ -0,0 +1,46 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.util; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class JsonCodec<T> { + private static final ObjectMapper MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true) + .configure(SerializationFeature.INDENT_OUTPUT, true) + .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + private final Class<T> clazz; + + public JsonCodec(Class<T> clazz) { + this.clazz = clazz; + } + + public byte[] toBytes(T instance) throws IOException { + return MAPPER.writeValueAsBytes(instance); + } + + public T fromBytes(byte[] data) throws IOException { + return MAPPER.readValue(new String(data, 0, data.length, StandardCharsets.UTF_8), clazz); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/Serializer.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/Serializer.java new file mode 100644 index 00000000000..2dfc69100fd --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/Serializer.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.util; + +import java.io.IOException; + +public interface Serializer { + byte[] serialize() throws IOException; +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org