http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java new file mode 100644 index 0000000..b6b6b97 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for parallel execution, takes closures for the various + * actions. + * There is no retry logic: it is expected to be handled by the closures. + */ +public final class Tasks { + private static final Logger LOG = LoggerFactory.getLogger(Tasks.class); + + private Tasks() { + } + + /** + * Callback invoked to process an item. + * @param <I> item type being processed + * @param <E> exception class which may be raised + */ + @FunctionalInterface + public interface Task<I, E extends Exception> { + void run(I item) throws E; + } + + /** + * Callback invoked on a failure. + * @param <I> item type being processed + * @param <E> exception class which may be raised + */ + @FunctionalInterface + public interface FailureTask<I, E extends Exception> { + + /** + * process a failure. + * @param item item the task is processing + * @param exception the exception which was raised. + * @throws E Exception of type E + */ + void run(I item, Exception exception) throws E; + } + + /** + * Builder for task execution. + * @param <I> item type + */ + public static class Builder<I> { + private final Iterable<I> items; + private ExecutorService service = null; + private FailureTask<I, ?> onFailure = null; + private boolean stopOnFailure = false; + private boolean suppressExceptions = false; + private Task<I, ?> revertTask = null; + private boolean stopRevertsOnFailure = false; + private Task<I, ?> abortTask = null; + private boolean stopAbortsOnFailure = false; + + /** + * Create the builder. + * @param items items to process + */ + Builder(Iterable<I> items) { + this.items = items; + } + + /** + * Declare executor service: if null, the tasks are executed in a single + * thread. + * @param executorService service to schedule tasks with. + * @return this builder. + */ + public Builder<I> executeWith(ExecutorService executorService) { + this.service = executorService; + return this; + } + + public Builder<I> onFailure(FailureTask<I, ?> task) { + this.onFailure = task; + return this; + } + + public Builder<I> stopOnFailure() { + this.stopOnFailure = true; + return this; + } + + public Builder<I> suppressExceptions() { + return suppressExceptions(true); + } + + public Builder<I> suppressExceptions(boolean suppress) { + this.suppressExceptions = suppress; + return this; + } + + public Builder<I> revertWith(Task<I, ?> task) { + this.revertTask = task; + return this; + } + + public Builder<I> stopRevertsOnFailure() { + this.stopRevertsOnFailure = true; + return this; + } + + public Builder<I> abortWith(Task<I, ?> task) { + this.abortTask = task; + return this; + } + + public Builder<I> stopAbortsOnFailure() { + this.stopAbortsOnFailure = true; + return this; + } + + public <E extends Exception> boolean run(Task<I, E> task) throws E { + if (service != null) { + return runParallel(task); + } else { + return runSingleThreaded(task); + } + } + + private <E extends Exception> boolean runSingleThreaded(Task<I, E> task) + throws E { + List<I> succeeded = new ArrayList<>(); + List<Exception> exceptions = new ArrayList<>(); + + Iterator<I> iterator = items.iterator(); + boolean threw = true; + try { + while (iterator.hasNext()) { + I item = iterator.next(); + try { + task.run(item); + succeeded.add(item); + + } catch (Exception e) { + exceptions.add(e); + + if (onFailure != null) { + try { + onFailure.run(item, e); + } catch (Exception failException) { + LOG.error("Failed to clean up on failure", e); + // keep going + } + } + + if (stopOnFailure) { + break; + } + } + } + + threw = false; + + } finally { + // threw handles exceptions that were *not* caught by the catch block, + // and exceptions that were caught and possibly handled by onFailure + // are kept in exceptions. + if (threw || !exceptions.isEmpty()) { + if (revertTask != null) { + boolean failed = false; + for (I item : succeeded) { + try { + revertTask.run(item); + } catch (Exception e) { + LOG.error("Failed to revert task", e); + failed = true; + // keep going + } + if (stopRevertsOnFailure && failed) { + break; + } + } + } + + if (abortTask != null) { + boolean failed = false; + while (iterator.hasNext()) { + try { + abortTask.run(iterator.next()); + } catch (Exception e) { + failed = true; + LOG.error("Failed to abort task", e); + // keep going + } + if (stopAbortsOnFailure && failed) { + break; + } + } + } + } + } + + if (!suppressExceptions && !exceptions.isEmpty()) { + Tasks.<E>throwOne(exceptions); + } + + return !threw && exceptions.isEmpty(); + } + + private <E extends Exception> boolean runParallel(final Task<I, E> task) + throws E { + final Queue<I> succeeded = new ConcurrentLinkedQueue<>(); + final Queue<Exception> exceptions = new ConcurrentLinkedQueue<>(); + final AtomicBoolean taskFailed = new AtomicBoolean(false); + final AtomicBoolean abortFailed = new AtomicBoolean(false); + final AtomicBoolean revertFailed = new AtomicBoolean(false); + + List<Future<?>> futures = new ArrayList<>(); + + for (final I item : items) { + // submit a task for each item that will either run or abort the task + futures.add(service.submit(new Runnable() { + @Override + public void run() { + if (!(stopOnFailure && taskFailed.get())) { + // run the task + boolean threw = true; + try { + LOG.debug("Executing task"); + task.run(item); + succeeded.add(item); + LOG.debug("Task succeeded"); + + threw = false; + + } catch (Exception e) { + taskFailed.set(true); + exceptions.add(e); + LOG.info("Task failed", e); + + if (onFailure != null) { + try { + onFailure.run(item, e); + } catch (Exception failException) { + LOG.error("Failed to clean up on failure", e); + // swallow the exception + } + } + } finally { + if (threw) { + taskFailed.set(true); + } + } + + } else if (abortTask != null) { + // abort the task instead of running it + if (stopAbortsOnFailure && abortFailed.get()) { + return; + } + + boolean failed = true; + try { + LOG.info("Aborting task"); + abortTask.run(item); + failed = false; + } catch (Exception e) { + LOG.error("Failed to abort task", e); + // swallow the exception + } finally { + if (failed) { + abortFailed.set(true); + } + } + } + } + })); + } + + // let the above tasks complete (or abort) + waitFor(futures); + int futureCount = futures.size(); + futures.clear(); + + if (taskFailed.get() && revertTask != null) { + // at least one task failed, revert any that succeeded + LOG.info("Reverting all {} succeeded tasks from {} futures", + succeeded.size(), futureCount); + for (final I item : succeeded) { + futures.add(service.submit(() -> { + if (stopRevertsOnFailure && revertFailed.get()) { + return; + } + + boolean failed = true; + try { + revertTask.run(item); + failed = false; + } catch (Exception e) { + LOG.error("Failed to revert task", e); + // swallow the exception + } finally { + if (failed) { + revertFailed.set(true); + } + } + })); + } + + // let the revert tasks complete + waitFor(futures); + } + + if (!suppressExceptions && !exceptions.isEmpty()) { + Tasks.<E>throwOne(exceptions); + } + + return !taskFailed.get(); + } + } + + /** + * Wait for all the futures to complete; there's a small sleep between + * each iteration; enough to yield the CPU. + * @param futures futures. + */ + private static void waitFor(Collection<Future<?>> futures) { + int size = futures.size(); + LOG.debug("Waiting for {} tasks to complete", size); + int oldNumFinished = 0; + while (true) { + int numFinished = (int) futures.stream().filter(Future::isDone).count(); + + if (oldNumFinished != numFinished) { + LOG.debug("Finished count -> {}/{}", numFinished, size); + oldNumFinished = numFinished; + } + + if (numFinished == size) { + // all of the futures are done, stop looping + break; + } else { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + futures.forEach(future -> future.cancel(true)); + Thread.currentThread().interrupt(); + break; + } + } + } + } + + public static <I> Builder<I> foreach(Iterable<I> items) { + return new Builder<>(items); + } + + public static <I> Builder<I> foreach(I[] items) { + return new Builder<>(Arrays.asList(items)); + } + + @SuppressWarnings("unchecked") + private static <E extends Exception> void throwOne( + Collection<Exception> exceptions) + throws E { + Iterator<Exception> iter = exceptions.iterator(); + Exception e = iter.next(); + Class<? extends Exception> exceptionClass = e.getClass(); + + while (iter.hasNext()) { + Exception other = iter.next(); + if (!exceptionClass.isInstance(other)) { + e.addSuppressed(other); + } + } + + Tasks.<E>castAndThrow(e); + } + + @SuppressWarnings("unchecked") + private static <E extends Exception> void castAndThrow(Exception e) throws E { + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw (E) e; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/ValidationFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/ValidationFailure.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/ValidationFailure.java new file mode 100644 index 0000000..ac29932 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/ValidationFailure.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.IOException; + +/** + * Exception raised on validation failures; kept as an IOException + * for consistency with other failures. + */ +public class ValidationFailure extends IOException { + + /** + * Create an instance with string formatting applied to the message + * and arguments. + * @param message error message + * @param args optional list of arguments + */ + public ValidationFailure(String message, Object... args) { + super(String.format(message, args)); + } + + /** + * Verify that a condition holds. + * @param expression expression which must be true + * @param message message to raise on a failure + * @param args arguments for the message formatting + * @throws ValidationFailure on a failure + */ + public static void verify(boolean expression, + String message, + Object... args) throws ValidationFailure { + if (!expression) { + throw new ValidationFailure(message, args); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java new file mode 100644 index 0000000..c0d7415 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.files; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.ValidationFailure; +import org.apache.hadoop.util.JsonSerialization; + +import static org.apache.hadoop.fs.s3a.commit.CommitUtils.validateCollectionClass; +import static org.apache.hadoop.fs.s3a.commit.ValidationFailure.verify; + +/** + * Persistent format for multiple pending commits. + * Contains 0 or more {@link SinglePendingCommit} entries; validation logic + * checks those values on load. + */ +@SuppressWarnings("unused") +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class PendingSet extends PersistentCommitData { + private static final Logger LOG = LoggerFactory.getLogger(PendingSet.class); + + /** + * Supported version value: {@value}. + * If this is changed the value of {@link #serialVersionUID} will change, + * to avoid deserialization problems. + */ + public static final int VERSION = 1; + + /** + * Serialization ID: {@value}. + */ + private static final long serialVersionUID = 0x11000 + VERSION; + + + /** Version marker. */ + private int version = VERSION; + + /** + * Commit list. + */ + private List<SinglePendingCommit> commits; + + /** + * Any custom extra data committer subclasses may choose to add. + */ + private final Map<String, String> extraData = new HashMap<>(0); + + public PendingSet() { + this(0); + } + + + public PendingSet(int size) { + commits = new ArrayList<>(size); + } + + /** + * Get a JSON serializer for this class. + * @return a serializer. + */ + public static JsonSerialization<PendingSet> serializer() { + return new JsonSerialization<>(PendingSet.class, false, true); + } + + /** + * Load an instance from a file, then validate it. + * @param fs filesystem + * @param path path + * @return the loaded instance + * @throws IOException IO failure + * @throws ValidationFailure if the data is invalid + */ + public static PendingSet load(FileSystem fs, Path path) + throws IOException { + LOG.debug("Reading pending commits in file {}", path); + PendingSet instance = serializer().load(fs, path); + instance.validate(); + return instance; + } + + /** + * Add a commit. + * @param commit the single commit + */ + public void add(SinglePendingCommit commit) { + commits.add(commit); + } + + /** + * Deserialize via java Serialization API: deserialize the instance + * and then call {@link #validate()} to verify that the deserialized + * data is valid. + * @param inStream input stream + * @throws IOException IO problem or validation failure + * @throws ClassNotFoundException reflection problems + */ + private void readObject(ObjectInputStream inStream) throws IOException, + ClassNotFoundException { + inStream.defaultReadObject(); + validate(); + } + + /** + * Validate the data: those fields which must be non empty, must be set. + * @throws ValidationFailure if the data is invalid + */ + public void validate() throws ValidationFailure { + verify(version == VERSION, "Wrong version: %s", version); + validateCollectionClass(extraData.keySet(), String.class); + validateCollectionClass(extraData.values(), String.class); + Set<String> destinations = new HashSet<>(commits.size()); + validateCollectionClass(commits, SinglePendingCommit.class); + for (SinglePendingCommit c : commits) { + c.validate(); + verify(!destinations.contains(c.getDestinationKey()), + "Destination %s is written to by more than one pending commit", + c.getDestinationKey()); + destinations.add(c.getDestinationKey()); + } + } + + @Override + public byte[] toBytes() throws IOException { + return serializer().toBytes(this); + } + + /** + * Number of commits. + * @return the number of commits in this structure. + */ + public int size() { + return commits != null ? commits.size() : 0; + } + + @Override + public void save(FileSystem fs, Path path, boolean overwrite) + throws IOException { + serializer().save(fs, path, this, overwrite); + } + + /** @return the version marker. */ + public int getVersion() { + return version; + } + + public void setVersion(int version) { + this.version = version; + } + + /** + * @return commit list. + */ + public List<SinglePendingCommit> getCommits() { + return commits; + } + + public void setCommits(List<SinglePendingCommit> commits) { + this.commits = commits; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java new file mode 100644 index 0000000..cc27d07 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.files; + +import java.io.IOException; +import java.io.Serializable; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.ValidationFailure; + +/** + * Class for single/multiple commit data structures. + */ +@SuppressWarnings("serial") +@InterfaceAudience.Private +@InterfaceStability.Unstable +public abstract class PersistentCommitData implements Serializable { + + /** + * Supported version value: {@value}. + * If this is changed the value of {@code serialVersionUID} will change, + * to avoid deserialization problems. + */ + public static final int VERSION = 1; + + /** + * Validate the data: those fields which must be non empty, must be set. + * @throws ValidationFailure if the data is invalid + */ + public abstract void validate() throws ValidationFailure; + + /** + * Serialize to JSON and then to a byte array, after performing a + * preflight validation of the data to be saved. + * @return the data in a persistable form. + * @throws IOException serialization problem or validation failure. + */ + public abstract byte[] toBytes() throws IOException; + + /** + * Save to a hadoop filesystem. + * @param fs filesystem + * @param path path + * @param overwrite should any existing file be overwritten + * @throws IOException IO exception + */ + public abstract void save(FileSystem fs, Path path, boolean overwrite) + throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java new file mode 100644 index 0000000..85cc38a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.files; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import com.amazonaws.services.s3.model.PartETag; +import com.google.common.base.Preconditions; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.ValidationFailure; +import org.apache.hadoop.util.JsonSerialization; + +import static org.apache.hadoop.fs.s3a.commit.CommitUtils.validateCollectionClass; +import static org.apache.hadoop.fs.s3a.commit.ValidationFailure.verify; +import static org.apache.hadoop.util.StringUtils.join; + +/** + * This is the serialization format for uploads yet to be committerd. + * + * It's marked as {@link Serializable} so that it can be passed in RPC + * calls; for this to work it relies on the fact that java.io ArrayList + * and LinkedList are serializable. If any other list type is used for etags, + * it must also be serialized. Jackson expects lists, and it is used + * to persist to disk. + * + */ +@SuppressWarnings("unused") +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class SinglePendingCommit extends PersistentCommitData + implements Iterable<String> { + + /** + * Serialization ID: {@value}. + */ + private static final long serialVersionUID = 0x10000 + VERSION; + + /** Version marker. */ + private int version = VERSION; + + /** + * This is the filename of the pending file itself. + * Used during processing; it's persistent value, if any, is ignored. + */ + private String filename; + + /** Path URI of the destination. */ + private String uri = ""; + + /** ID of the upload. */ + private String uploadId; + + /** Destination bucket. */ + private String bucket; + + /** Destination key in the bucket. */ + private String destinationKey; + + /** When was the upload created? */ + private long created; + + /** When was the upload saved? */ + private long saved; + + /** timestamp as date; no expectation of parseability. */ + private String date; + + /** Job ID, if known. */ + private String jobId = ""; + + /** Task ID, if known. */ + private String taskId = ""; + + /** Arbitrary notes. */ + private String text = ""; + + /** Ordered list of etags. */ + private List<String> etags; + + /** + * Any custom extra data committer subclasses may choose to add. + */ + private Map<String, String> extraData = new HashMap<>(0); + + /** Destination file size. */ + private long length; + + public SinglePendingCommit() { + } + + /** + * Get a JSON serializer for this class. + * @return a serializer. + */ + public static JsonSerialization<SinglePendingCommit> serializer() { + return new JsonSerialization<>(SinglePendingCommit.class, false, true); + } + + /** + * Load an instance from a file, then validate it. + * @param fs filesystem + * @param path path + * @return the loaded instance + * @throws IOException IO failure + * @throws ValidationFailure if the data is invalid + */ + public static SinglePendingCommit load(FileSystem fs, Path path) + throws IOException { + SinglePendingCommit instance = serializer().load(fs, path); + instance.filename = path.toString(); + instance.validate(); + return instance; + } + + /** + * Deserialize via java Serialization API: deserialize the instance + * and then call {@link #validate()} to verify that the deserialized + * data is valid. + * @param inStream input stream + * @throws IOException IO problem + * @throws ClassNotFoundException reflection problems + * @throws ValidationFailure validation failure + */ + private void readObject(ObjectInputStream inStream) throws IOException, + ClassNotFoundException { + inStream.defaultReadObject(); + validate(); + } + + /** + * Set the various timestamp fields to the supplied value. + * @param millis time in milliseconds + */ + public void touch(long millis) { + created = millis; + saved = millis; + date = new Date(millis).toString(); + } + + /** + * Set the commit data. + * @param parts ordered list of etags. + * @throws ValidationFailure if the data is invalid + */ + public void bindCommitData(List<PartETag> parts) throws ValidationFailure { + etags = new ArrayList<>(parts.size()); + int counter = 1; + for (PartETag part : parts) { + verify(part.getPartNumber() == counter, + "Expected part number %s but got %s", counter, part.getPartNumber()); + etags.add(part.getETag()); + counter++; + } + } + + @Override + public void validate() throws ValidationFailure { + verify(version == VERSION, "Wrong version: %s", version); + verify(StringUtils.isNotEmpty(bucket), "Empty bucket"); + verify(StringUtils.isNotEmpty(destinationKey), + "Empty destination"); + verify(StringUtils.isNotEmpty(uploadId), "Empty uploadId"); + verify(length >= 0, "Invalid length: " + length); + destinationPath(); + verify(etags != null, "No etag list"); + validateCollectionClass(etags, String.class); + for (String etag : etags) { + verify(StringUtils.isNotEmpty(etag), "Empty etag"); + } + if (extraData != null) { + validateCollectionClass(extraData.keySet(), String.class); + validateCollectionClass(extraData.values(), String.class); + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "DelayedCompleteData{"); + sb.append("version=").append(version); + sb.append(", uri='").append(uri).append('\''); + sb.append(", destination='").append(destinationKey).append('\''); + sb.append(", uploadId='").append(uploadId).append('\''); + sb.append(", created=").append(created); + sb.append(", saved=").append(saved); + sb.append(", size=").append(length); + sb.append(", date='").append(date).append('\''); + sb.append(", jobId='").append(jobId).append('\''); + sb.append(", taskId='").append(taskId).append('\''); + sb.append(", notes='").append(text).append('\''); + if (etags != null) { + sb.append(", etags=["); + sb.append(join(",", etags)); + sb.append(']'); + } else { + sb.append(", etags=null"); + } + sb.append('}'); + return sb.toString(); + } + + @Override + public byte[] toBytes() throws IOException { + validate(); + return serializer().toBytes(this); + } + + @Override + public void save(FileSystem fs, Path path, boolean overwrite) + throws IOException { + serializer().save(fs, path, this, overwrite); + } + + /** + * Build the destination path of the object. + * @return the path + * @throws IllegalStateException if the URI is invalid + */ + public Path destinationPath() { + Preconditions.checkState(StringUtils.isNotEmpty(uri), "Empty uri"); + try { + return new Path(new URI(uri)); + } catch (URISyntaxException e) { + throw new IllegalStateException("Cannot parse URI " + uri); + } + } + + /** + * Get the number of etags. + * @return the size of the etag list. + */ + public int getPartCount() { + return etags.size(); + } + + /** + * Iterate over the etags. + * @return an iterator. + */ + @Override + public Iterator<String> iterator() { + return etags.iterator(); + } + + /** @return version marker. */ + public int getVersion() { + return version; + } + + public void setVersion(int version) { + this.version = version; + } + + /** + * This is the filename of the pending file itself. + * Used during processing; it's persistent value, if any, is ignored. + * @return filename + */ + public String getFilename() { + return filename; + } + + public void setFilename(String filename) { + this.filename = filename; + } + + /** @return path URI of the destination. */ + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + /** @return ID of the upload. */ + public String getUploadId() { + return uploadId; + } + + public void setUploadId(String uploadId) { + this.uploadId = uploadId; + } + + /** @return destination bucket. */ + public String getBucket() { + return bucket; + } + + public void setBucket(String bucket) { + this.bucket = bucket; + } + + /** @return destination key in the bucket. */ + public String getDestinationKey() { + return destinationKey; + } + + public void setDestinationKey(String destinationKey) { + this.destinationKey = destinationKey; + } + + /** + * When was the upload created? + * @return timestamp + */ + public long getCreated() { + return created; + } + + public void setCreated(long created) { + this.created = created; + } + + /** + * When was the upload saved? + * @return timestamp + */ + public long getSaved() { + return saved; + } + + public void setSaved(long saved) { + this.saved = saved; + } + + /** + * Timestamp as date; no expectation of parseability. + * @return date string + */ + public String getDate() { + return date; + } + + public void setDate(String date) { + this.date = date; + } + + /** @return Job ID, if known. */ + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + /** @return Task ID, if known. */ + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + /** + * Arbitrary notes. + * @return any notes + */ + public String getText() { + return text; + } + + public void setText(String text) { + this.text = text; + } + + /** @return ordered list of etags. */ + public List<String> getEtags() { + return etags; + } + + public void setEtags(List<String> etags) { + this.etags = etags; + } + + /** + * Any custom extra data committer subclasses may choose to add. + * @return custom data + */ + public Map<String, String> getExtraData() { + return extraData; + } + + public void setExtraData(Map<String, String> extraData) { + this.extraData = extraData; + } + + /** + * Destination file size. + * @return size of destination object + */ + public long getLength() { + return length; + } + + public void setLength(long length) { + this.length = length; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java new file mode 100644 index 0000000..6cf1f1e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.files; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.ValidationFailure; +import org.apache.hadoop.util.JsonSerialization; + +/** + * Summary data saved into a {@code _SUCCESS} marker file. + * + * This provides an easy way to determine which committer was used + * to commit work. + * <ol> + * <li>File length == 0: classic {@code FileOutputCommitter}.</li> + * <li>Loadable as {@link SuccessData}: + * A s3guard committer with name in in {@link #committer} field.</li> + * <li>Not loadable? Something else.</li> + * </ol> + * + * This is an unstable structure intended for diagnostics and testing. + * Applications reading this data should use/check the {@link #name} field + * to differentiate from any other JSON-based manifest and to identify + * changes in the output format. + */ +@SuppressWarnings("unused") +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class SuccessData extends PersistentCommitData { + private static final Logger LOG = LoggerFactory.getLogger(SuccessData.class); + + /** + * Serialization ID: {@value}. + */ + private static final long serialVersionUID = 507133045258460084L; + + /** + * Name to include in persisted data, so as to differentiate from + * any other manifests: {@value}. + */ + public static final String NAME + = "org.apache.hadoop.fs.s3a.commit.files.SuccessData/1"; + + /** + * Name of file; includes version marker. + */ + private String name; + + /** Timestamp of creation. */ + private long timestamp; + + /** Timestamp as date string; no expectation of parseability. */ + private String date; + + /** + * Host which created the file (implicitly: committed the work). + */ + private String hostname; + + /** + * Committer name. + */ + private String committer; + + /** + * Description text. + */ + private String description; + + /** + * Metrics. + */ + private Map<String, Long> metrics = new HashMap<>(); + + /** + * Diagnostics information. + */ + private Map<String, String> diagnostics = new HashMap<>(); + + /** + * Filenames in the commit. + */ + private List<String> filenames = new ArrayList<>(0); + + @Override + public void validate() throws ValidationFailure { + ValidationFailure.verify(name != null, + "Incompatible file format: no 'name' field"); + ValidationFailure.verify(NAME.equals(name), + "Incompatible file format: " + name); + } + + @Override + public byte[] toBytes() throws IOException { + return serializer().toBytes(this); + } + + @Override + public void save(FileSystem fs, Path path, boolean overwrite) + throws IOException { + // always set the name field before being saved. + name = NAME; + serializer().save(fs, path, this, overwrite); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "SuccessData{"); + sb.append("committer='").append(committer).append('\''); + sb.append(", hostname='").append(hostname).append('\''); + sb.append(", description='").append(description).append('\''); + sb.append(", date='").append(date).append('\''); + sb.append(", filenames=[").append( + StringUtils.join(filenames, ", ")) + .append("]"); + sb.append('}'); + return sb.toString(); + } + + /** + * Dump the metrics (if any) to a string. + * The metrics are sorted for ease of viewing. + * @param prefix prefix before every entry + * @param middle string between key and value + * @param suffix suffix to each entry + * @return the dumped string + */ + public String dumpMetrics(String prefix, String middle, String suffix) { + return joinMap(metrics, prefix, middle, suffix); + } + + /** + * Dump the diagnostics (if any) to a string. + * @param prefix prefix before every entry + * @param middle string between key and value + * @param suffix suffix to each entry + * @return the dumped string + */ + public String dumpDiagnostics(String prefix, String middle, String suffix) { + return joinMap(diagnostics, prefix, middle, suffix); + } + + /** + * Join any map of string to value into a string, sorting the keys first. + * @param map map to join + * @param prefix prefix before every entry + * @param middle string between key and value + * @param suffix suffix to each entry + * @return a string for reporting. + */ + protected static String joinMap(Map<String, ?> map, + String prefix, + String middle, String suffix) { + if (map == null) { + return ""; + } + List<String> list = new ArrayList<>(map.keySet()); + Collections.sort(list); + StringBuilder sb = new StringBuilder(list.size() * 32); + for (String k : list) { + sb.append(prefix) + .append(k) + .append(middle) + .append(map.get(k)) + .append(suffix); + } + return sb.toString(); + } + + /** + * Load an instance from a file, then validate it. + * @param fs filesystem + * @param path path + * @return the loaded instance + * @throws IOException IO failure + * @throws ValidationFailure if the data is invalid + */ + public static SuccessData load(FileSystem fs, Path path) + throws IOException { + LOG.debug("Reading success data from {}", path); + SuccessData instance = serializer().load(fs, path); + instance.validate(); + return instance; + } + + /** + * Get a JSON serializer for this class. + * @return a serializer. + */ + private static JsonSerialization<SuccessData> serializer() { + return new JsonSerialization<>(SuccessData.class, false, true); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + /** @return timestamp of creation. */ + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + /** @return timestamp as date; no expectation of parseability. */ + public String getDate() { + return date; + } + + public void setDate(String date) { + this.date = date; + } + + /** + * @return host which created the file (implicitly: committed the work). + */ + public String getHostname() { + return hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + /** + * @return committer name. + */ + public String getCommitter() { + return committer; + } + + public void setCommitter(String committer) { + this.committer = committer; + } + + /** + * @return any description text. + */ + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + /** + * @return any metrics. + */ + public Map<String, Long> getMetrics() { + return metrics; + } + + public void setMetrics(Map<String, Long> metrics) { + this.metrics = metrics; + } + + /** + * @return a list of filenames in the commit. + */ + public List<String> getFilenames() { + return filenames; + } + + public void setFilenames(List<String> filenames) { + this.filenames = filenames; + } + + public Map<String, String> getDiagnostics() { + return diagnostics; + } + + public void setDiagnostics(Map<String, String> diagnostics) { + this.diagnostics = diagnostics; + } + + /** + * Add a diagnostics entry. + * @param key name + * @param value value + */ + public void addDiagnostic(String key, String value) { + diagnostics.put(key, value); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/package-info.java new file mode 100644 index 0000000..0d57494 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/package-info.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Persistent data formats for the committers. + * + * All of these formats share a base class of + * {@link org.apache.hadoop.fs.s3a.commit.files.PersistentCommitData}; + * the subclasses record + * <ol> + * <li>The content of a single pending commit + * (used by the Magic committer).</li> + * <li>The list of all the files uploaded by a staging committer.</li> + * <li>The summary information saved in the {@code _SUCCESS} file.</li> + * </ol> + * + * There are no guarantees of stability between versions; these are internal + * structures. + * + * The {@link org.apache.hadoop.fs.s3a.commit.files.SuccessData} file is + * the one visible to callers after a job completes; it is an unstable + * manifest intended for testing only. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.fs.s3a.commit.files; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java new file mode 100644 index 0000000..cf365c2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; + +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.commit.PutTracker; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; + +/** + * Put tracker for Magic commits. + * <p>Important</p>: must not directly or indirectly import a class which + * uses any datatype in hadoop-mapreduce. + */ +@InterfaceAudience.Private +public class MagicCommitTracker extends PutTracker { + public static final Logger LOG = LoggerFactory.getLogger( + MagicCommitTracker.class); + + private final String originalDestKey; + private final String pendingPartKey; + private final Path path; + private final WriteOperationHelper writer; + private final String bucket; + private static final byte[] EMPTY = new byte[0]; + + /** + * Magic commit tracker. + * @param path path nominally being written to + * @param bucket dest bucket + * @param originalDestKey the original key, in the magic directory. + * @param destKey key for the destination + * @param pendingsetKey key of the pendingset file + * @param writer writer instance to use for operations + */ + public MagicCommitTracker(Path path, + String bucket, + String originalDestKey, + String destKey, + String pendingsetKey, + WriteOperationHelper writer) { + super(destKey); + this.bucket = bucket; + this.path = path; + this.originalDestKey = originalDestKey; + this.pendingPartKey = pendingsetKey; + this.writer = writer; + LOG.info("File {} is written as magic file to path {}", + path, destKey); + } + + /** + * Initialize the tracker. + * @return true, indicating that the multipart commit must start. + * @throws IOException any IO problem. + */ + @Override + public boolean initialize() throws IOException { + return true; + } + + /** + * Flag to indicate that output is not visible after the stream + * is closed. + * @return true + */ + @Override + public boolean outputImmediatelyVisible() { + return false; + } + + /** + * Complete operation: generate the final commit data, put it. + * @param uploadId Upload ID + * @param parts list of parts + * @param bytesWritten bytes written + * @return false, indicating that the commit must fail. + * @throws IOException any IO problem. + * @throws IllegalArgumentException bad argument + */ + @Override + public boolean aboutToComplete(String uploadId, + List<PartETag> parts, + long bytesWritten) + throws IOException { + Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId), + "empty/null upload ID: "+ uploadId); + Preconditions.checkArgument(parts != null, + "No uploaded parts list"); + Preconditions.checkArgument(!parts.isEmpty(), + "No uploaded parts to save"); + SinglePendingCommit commitData = new SinglePendingCommit(); + commitData.touch(System.currentTimeMillis()); + commitData.setDestinationKey(getDestKey()); + commitData.setBucket(bucket); + commitData.setUri(path.toUri().toString()); + commitData.setUploadId(uploadId); + commitData.setText(""); + commitData.setLength(bytesWritten); + commitData.bindCommitData(parts); + byte[] bytes = commitData.toBytes(); + LOG.info("Uncommitted data pending to file {};" + + " commit metadata for {} parts in {}. sixe: {} byte(s)", + path.toUri(), parts.size(), pendingPartKey, bytesWritten); + LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}", + path, pendingPartKey, commitData); + PutObjectRequest put = writer.createPutObjectRequest( + pendingPartKey, + new ByteArrayInputStream(bytes), + bytes.length); + writer.uploadObject(put); + + // now put a 0-byte file with the name of the original under-magic path + PutObjectRequest originalDestPut = writer.createPutObjectRequest( + originalDestKey, + new ByteArrayInputStream(EMPTY), + 0); + writer.uploadObject(originalDestPut); + return false; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "MagicCommitTracker{"); + sb.append(", destKey=").append(getDestKey()); + sb.append(", pendingPartKey='").append(pendingPartKey).append('\''); + sb.append(", path=").append(path); + sb.append(", writer=").append(writer); + sb.append('}'); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java new file mode 100644 index 0000000..c305141 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import java.io.IOException; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter; +import org.apache.hadoop.fs.s3a.commit.CommitOperations; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR; +import org.apache.hadoop.fs.s3a.commit.DurationInfo; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; +import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*; +import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*; + +/** + * This is a dedicated committer which requires the "magic" directory feature + * of the S3A Filesystem to be enabled; it then uses paths for task and job + * attempts in magic paths, so as to ensure that the final output goes direct + * to the destination directory. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class MagicS3GuardCommitter extends AbstractS3ACommitter { + private static final Logger LOG = + LoggerFactory.getLogger(MagicS3GuardCommitter.class); + + /** Name: {@value}. */ + public static final String NAME = CommitConstants.COMMITTER_NAME_MAGIC; + + /** + * Create a task committer. + * @param outputPath the job's output path + * @param context the task's context + * @throws IOException on a failure + */ + public MagicS3GuardCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + setWorkPath(getTaskAttemptPath(context)); + verifyIsMagicCommitPath(getDestS3AFS(), getWorkPath()); + LOG.debug("Task attempt {} has work path {}", + context.getTaskAttemptID(), + getWorkPath()); + } + + @Override + public String getName() { + return NAME; + } + + /** + * Require magic paths in the FS client. + * @return true, always. + */ + @Override + protected boolean requiresDelayedCommitOutputInFileSystem() { + return true; + } + + @Override + public void setupJob(JobContext context) throws IOException { + try (DurationInfo d = new DurationInfo(LOG, + "Setup Job %s", jobIdString(context))) { + Path jobAttemptPath = getJobAttemptPath(context); + getDestinationFS(jobAttemptPath, + context.getConfiguration()).mkdirs(jobAttemptPath); + } + } + + /** + * Get the list of pending uploads for this job attempt, by listing + * all .pendingset files in the job attempt directory. + * @param context job context + * @return a list of pending commits. + * @throws IOException Any IO failure + */ + protected List<SinglePendingCommit> listPendingUploadsToCommit( + JobContext context) + throws IOException { + FileSystem fs = getDestFS(); + return loadPendingsetFiles(context, false, fs, + listAndFilter(fs, getJobAttemptPath(context), false, + CommitOperations.PENDINGSET_FILTER)); + } + + /** + * Delete the magic directory. + */ + public void cleanupStagingDirs() { + Path path = magicSubdir(getOutputPath()); + Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(), + () -> deleteWithWarning(getDestFS(), path, true)); + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + try (DurationInfo d = new DurationInfo(LOG, + "Setup Task %s", context.getTaskAttemptID())) { + Path taskAttemptPath = getTaskAttemptPath(context); + FileSystem fs = taskAttemptPath.getFileSystem(getConf()); + fs.mkdirs(taskAttemptPath); + } + } + + /** + * Did this task write any files in the work directory? + * Probes for a task existing by looking to see if the attempt dir exists. + * This adds more HTTP requests to the call. It may be better just to + * return true and rely on the commit task doing the work. + * @param context the task's context + * @return true if the attempt path exists + * @throws IOException failure to list the path + */ + @Override + public boolean needsTaskCommit(TaskAttemptContext context) + throws IOException { + Path taskAttemptPath = getTaskAttemptPath(context); + try (DurationInfo d = new DurationInfo(LOG, + "needsTaskCommit task %s", context.getTaskAttemptID())) { + return taskAttemptPath.getFileSystem( + context.getConfiguration()) + .exists(taskAttemptPath); + } + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + try (DurationInfo d = new DurationInfo(LOG, + "Commit task %s", context.getTaskAttemptID())) { + PendingSet commits = innerCommitTask(context); + LOG.info("Task {} committed {} files", context.getTaskAttemptID(), + commits.size()); + } catch (IOException e) { + getCommitOperations().taskCompleted(false); + throw e; + } finally { + // delete the task attempt so there's no possibility of a second attempt + deleteTaskAttemptPathQuietly(context); + } + getCommitOperations().taskCompleted(true); + } + + /** + * Inner routine for committing a task. + * The list of pending commits is loaded and then saved to the job attempt + * dir. + * Failure to load any file or save the final file triggers an abort of + * all known pending commits. + * @param context context + * @return the summary file + * @throws IOException exception + */ + private PendingSet innerCommitTask( + TaskAttemptContext context) throws IOException { + Path taskAttemptPath = getTaskAttemptPath(context); + // load in all pending commits. + CommitOperations actions = getCommitOperations(); + Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>> + loaded = actions.loadSinglePendingCommits( + taskAttemptPath, true); + PendingSet pendingSet = loaded.getKey(); + List<Pair<LocatedFileStatus, IOException>> failures = loaded.getValue(); + if (!failures.isEmpty()) { + // At least one file failed to load + // revert all which did; report failure with first exception + LOG.error("At least one commit file could not be read: failing"); + abortPendingUploads(context, pendingSet.getCommits(), true); + throw failures.get(0).getValue(); + } + // patch in IDs + String jobId = String.valueOf(context.getJobID()); + String taskId = String.valueOf(context.getTaskAttemptID()); + for (SinglePendingCommit commit : pendingSet.getCommits()) { + commit.setJobId(jobId); + commit.setTaskId(taskId); + } + + Path jobAttemptPath = getJobAttemptPath(context); + TaskAttemptID taskAttemptID = context.getTaskAttemptID(); + Path taskOutcomePath = new Path(jobAttemptPath, + taskAttemptID.getTaskID().toString() + + CommitConstants.PENDINGSET_SUFFIX); + LOG.info("Saving work of {} to {}", taskAttemptID, taskOutcomePath); + try { + pendingSet.save(getDestFS(), taskOutcomePath, false); + } catch (IOException e) { + LOG.warn("Failed to save task commit data to {} ", + taskOutcomePath, e); + abortPendingUploads(context, pendingSet.getCommits(), true); + throw e; + } + return pendingSet; + } + + /** + * Abort a task. Attempt load then abort all pending files, + * then try to delete the task attempt path. + * This method may be called on the job committer, rather than the + * task one (such as in the MapReduce AM after a task container failure). + * It must extract all paths and state from the passed in context. + * @param context task context + * @throws IOException if there was some problem querying the path other + * than it not actually existing. + */ + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + Path attemptPath = getTaskAttemptPath(context); + try (DurationInfo d = new DurationInfo(LOG, + "Abort task %s", context.getTaskAttemptID())) { + getCommitOperations().abortAllSinglePendingCommits(attemptPath, true); + } finally { + deleteQuietly( + attemptPath.getFileSystem(context.getConfiguration()), + attemptPath, true); + } + } + + /** + * Compute the path where the output of a given job attempt will be placed. + * @param appAttemptId the ID of the application attempt for this job. + * @return the path to store job attempt data. + */ + protected Path getJobAttemptPath(int appAttemptId) { + return getMagicJobAttemptPath(appAttemptId, getOutputPath()); + } + + /** + * Compute the path where the output of a task attempt is stored until + * that task is committed. + * + * @param context the context of the task attempt. + * @return the path where a task attempt should be stored. + */ + public Path getTaskAttemptPath(TaskAttemptContext context) { + return getMagicTaskAttemptPath(context, getOutputPath()); + } + + @Override + protected Path getBaseTaskAttemptPath(TaskAttemptContext context) { + return getBaseMagicTaskAttemptPath(context, getOutputPath()); + } + + /** + * Get a temporary directory for data. When a task is aborted/cleaned + * up, the contents of this directory are all deleted. + * @param context task context + * @return a path for temporary data. + */ + public Path getTempTaskAttemptPath(TaskAttemptContext context) { + return CommitUtilsWithMR.getTempTaskAttemptPath(context, getOutputPath()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitterFactory.java new file mode 100644 index 0000000..0392491 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitterFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; + +/** + * Factory for the Magic committer. + */ +public class MagicS3GuardCommitterFactory + extends AbstractS3ACommitterFactory { + + /** + * Name of this class: {@value}. + */ + public static final String CLASSNAME + = "org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitterFactory"; + + public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem, + Path outputPath, + TaskAttemptContext context) throws IOException { + return new MagicS3GuardCommitter(outputPath, context); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/package-info.java new file mode 100644 index 0000000..56925e9 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This is the "Magic" committer and support. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/package-info.java new file mode 100644 index 0000000..7647675 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Support for committing the output of analytics jobs directly to S3. + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.fs.s3a.commit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/ConflictResolution.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/ConflictResolution.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/ConflictResolution.java new file mode 100644 index 0000000..52d3411 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/ConflictResolution.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +/** + * Enum of conflict resolution algorithms. + */ +public enum ConflictResolution { + /** Fail. */ + FAIL, + + /** Merge new data with existing data. */ + APPEND, + + /** Overwrite existing data. */ + REPLACE +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java new file mode 100644 index 0000000..03dfed2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.IOException; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*; + +/** + * This commits to a directory. + * The conflict policy is + * <ul> + * <li>FAIL: fail the commit</li> + * <li>APPEND: add extra data to the destination.</li> + * <li>REPLACE: delete the destination directory in the job commit + * (i.e. after and only if all tasks have succeeded.</li> + * </ul> + */ +public class DirectoryStagingCommitter extends StagingCommitter { + private static final Logger LOG = LoggerFactory.getLogger( + DirectoryStagingCommitter.class); + + /** Name: {@value}. */ + public static final String NAME = COMMITTER_NAME_DIRECTORY; + + public DirectoryStagingCommitter(Path outputPath, TaskAttemptContext context) + throws IOException { + super(outputPath, context); + } + + @Override + public String getName() { + return NAME; + } + + @Override + public void setupJob(JobContext context) throws IOException { + super.setupJob(context); + Path outputPath = getOutputPath(); + FileSystem fs = getDestFS(); + if (getConflictResolutionMode(context, fs.getConf()) + == ConflictResolution.FAIL + && fs.exists(outputPath)) { + LOG.debug("Failing commit by task attempt {} to write" + + " to existing output path {}", + context.getJobID(), getOutputPath()); + throw new PathExistsException(outputPath.toString(), E_DEST_EXISTS); + } + } + + /** + * Pre-commit actions for a job. + * Here: look at the conflict resolution mode and choose + * an action based on the current policy. + * @param context job context + * @param pending pending commits + * @throws IOException any failure + */ + @Override + protected void preCommitJob(JobContext context, + List<SinglePendingCommit> pending) throws IOException { + Path outputPath = getOutputPath(); + FileSystem fs = getDestFS(); + Configuration fsConf = fs.getConf(); + switch (getConflictResolutionMode(context, fsConf)) { + case FAIL: + // this was checked in setupJob, but this avoids some cases where + // output was created while the job was processing + if (fs.exists(outputPath)) { + throw new PathExistsException(outputPath.toString(), E_DEST_EXISTS); + } + break; + case APPEND: + // do nothing + break; + case REPLACE: + if (fs.delete(outputPath, true /* recursive */)) { + LOG.info("{}: removed output path to be replaced: {}", + getRole(), outputPath); + } + break; + default: + throw new IOException(getRole() + ": unknown conflict resolution mode: " + + getConflictResolutionMode(context, fsConf)); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitterFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitterFactory.java new file mode 100644 index 0000000..bfa8914 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitterFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; + +/** + * Factory for the Directory committer. + */ +public class DirectoryStagingCommitterFactory + extends AbstractS3ACommitterFactory { + + /** + * Name of this class: {@value}. + */ + public static final String CLASSNAME + = "org.apache.hadoop.fs.s3a.commit.staging" + + ".DirectoryStagingCommitterFactory"; + + public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem, + Path outputPath, + TaskAttemptContext context) throws IOException { + return new DirectoryStagingCommitter(outputPath, context); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java new file mode 100644 index 0000000..bfaf443 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.s3a.commit.PathCommitException; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*; + +/** + * Partitioned committer. + * This writes data to specific "partition" subdirectories, applying + * conflict resolution on a partition-by-partition basis. The existence + * and state of any parallel partitions for which there is no are output + * files are not considered in the conflict resolution. + * + * The conflict policy is + * <ul> + * <li>FAIL: fail the commit if any of the partitions have data.</li> + * <li>APPEND: add extra data to the destination partitions.</li> + * <li>REPLACE: delete the destination partition in the job commit + * (i.e. after and only if all tasks have succeeded.</li> + * </ul> + */ +public class PartitionedStagingCommitter extends StagingCommitter { + + private static final Logger LOG = LoggerFactory.getLogger( + PartitionedStagingCommitter.class); + + /** Name: {@value}. */ + public static final String NAME = COMMITTER_NAME_PARTITIONED; + + public PartitionedStagingCommitter(Path outputPath, + TaskAttemptContext context) + throws IOException { + super(outputPath, context); + } + + @Override + public String getName() { + return NAME; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "PartitionedStagingCommitter{"); + sb.append(super.toString()); + sb.append('}'); + return sb.toString(); + } + + @Override + protected int commitTaskInternal(TaskAttemptContext context, + List<? extends FileStatus> taskOutput) throws IOException { + Path attemptPath = getTaskAttemptPath(context); + Set<String> partitions = Paths.getPartitions(attemptPath, taskOutput); + + // enforce conflict resolution, but only if the mode is FAIL. for APPEND, + // it doesn't matter that the partitions are already there, and for REPLACE, + // deletion should be done during job commit. + FileSystem fs = getDestFS(); + if (getConflictResolutionMode(context, fs.getConf()) + == ConflictResolution.FAIL) { + for (String partition : partitions) { + // getFinalPath adds the UUID to the file name. this needs the parent. + Path partitionPath = getFinalPath(partition + "/file", + context).getParent(); + if (fs.exists(partitionPath)) { + LOG.debug("Failing commit by task attempt {} to write" + + " to existing path {} under {}", + context.getTaskAttemptID(), partitionPath, getOutputPath()); + throw new PathExistsException(partitionPath.toString(), + E_DEST_EXISTS); + } + } + } + return super.commitTaskInternal(context, taskOutput); + } + + /** + * Job-side conflict resolution. + * The partition path conflict resolution actions are: + * <ol> + * <li>FAIL: assume checking has taken place earlier; no more checks.</li> + * <li>APPEND: allowed.; no need to check.</li> + * <li>REPLACE deletes all existing partitions.</li> + * </ol> + * @param context job context + * @param pending the pending operations + * @throws IOException any failure + */ + @Override + protected void preCommitJob(JobContext context, + List<SinglePendingCommit> pending) throws IOException { + + FileSystem fs = getDestFS(); + + // enforce conflict resolution + Configuration fsConf = fs.getConf(); + switch (getConflictResolutionMode(context, fsConf)) { + case FAIL: + // FAIL checking is done on the task side, so this does nothing + break; + case APPEND: + // no check is needed because the output may exist for appending + break; + case REPLACE: + Set<Path> partitions = pending.stream() + .map(SinglePendingCommit::destinationPath) + .map(Path::getParent) + .collect(Collectors.toCollection(Sets::newLinkedHashSet)); + for (Path partitionPath : partitions) { + LOG.debug("{}: removing partition path to be replaced: " + + getRole(), partitionPath); + fs.delete(partitionPath, true); + } + break; + default: + throw new PathCommitException("", + getRole() + ": unknown conflict resolution mode: " + + getConflictResolutionMode(context, fsConf)); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org