http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitPaths.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitPaths.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitPaths.java new file mode 100644 index 0000000..47d112d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitPaths.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.test.LambdaTestUtils.*; +import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; + +/** + * Tests for {@link MagicCommitPaths} path operations. + */ +public class TestMagicCommitPaths extends Assert { + + private static final List<String> MAGIC_AT_ROOT = + list(MAGIC); + private static final List<String> MAGIC_AT_ROOT_WITH_CHILD = + list(MAGIC, "child"); + private static final List<String> MAGIC_WITH_CHILD = + list("parent", MAGIC, "child"); + private static final List<String> MAGIC_AT_WITHOUT_CHILD = + list("parent", MAGIC); + + private static final List<String> DEEP_MAGIC = + list("parent1", "parent2", MAGIC, "child1", "child2"); + + public static final String[] EMPTY = {}; + + @Test + public void testSplitPathEmpty() throws Throwable { + intercept(IllegalArgumentException.class, + () -> splitPathToElements(new Path(""))); + } + + @Test + public void testSplitPathDoubleBackslash() { + assertPathSplits("//", EMPTY); + } + + @Test + public void testSplitRootPath() { + assertPathSplits("/", EMPTY); + } + + @Test + public void testSplitBasic() { + assertPathSplits("/a/b/c", + new String[]{"a", "b", "c"}); + } + + @Test + public void testSplitTrailingSlash() { + assertPathSplits("/a/b/c/", + new String[]{"a", "b", "c"}); + } + + @Test + public void testSplitShortPath() { + assertPathSplits("/a", + new String[]{"a"}); + } + + @Test + public void testSplitShortPathTrailingSlash() { + assertPathSplits("/a/", + new String[]{"a"}); + } + + @Test + public void testParentsMagicRoot() { + assertParents(EMPTY, MAGIC_AT_ROOT); + } + + @Test + public void testChildrenMagicRoot() { + assertChildren(EMPTY, MAGIC_AT_ROOT); + } + + @Test + public void testParentsMagicRootWithChild() { + assertParents(EMPTY, MAGIC_AT_ROOT_WITH_CHILD); + } + + @Test + public void testChildMagicRootWithChild() { + assertChildren(a("child"), MAGIC_AT_ROOT_WITH_CHILD); + } + + @Test + public void testChildrenMagicWithoutChild() { + assertChildren(EMPTY, MAGIC_AT_WITHOUT_CHILD); + } + + @Test + public void testChildMagicWithChild() { + assertChildren(a("child"), MAGIC_WITH_CHILD); + } + + @Test + public void testParentMagicWithChild() { + assertParents(a("parent"), MAGIC_WITH_CHILD); + } + + @Test + public void testParentDeepMagic() { + assertParents(a("parent1", "parent2"), DEEP_MAGIC); + } + + @Test + public void testChildrenDeepMagic() { + assertChildren(a("child1", "child2"), DEEP_MAGIC); + } + + @Test + public void testLastElementEmpty() throws Throwable { + intercept(IllegalArgumentException.class, + () -> lastElement(new ArrayList<>(0))); + } + + @Test + public void testLastElementSingle() { + assertEquals("first", lastElement(l("first"))); + } + + @Test + public void testLastElementDouble() { + assertEquals("2", lastElement(l("first", "2"))); + } + + @Test + public void testFinalDestinationNoMagic() { + assertEquals(l("first", "2"), + finalDestination(l("first", "2"))); + } + + @Test + public void testFinalDestinationMagic1() { + assertEquals(l("first", "2"), + finalDestination(l("first", MAGIC, "2"))); + } + + @Test + public void testFinalDestinationMagic2() { + assertEquals(l("first", "3.txt"), + finalDestination(l("first", MAGIC, "2", "3.txt"))); + } + + @Test + public void testFinalDestinationRootMagic2() { + assertEquals(l("3.txt"), + finalDestination(l(MAGIC, "2", "3.txt"))); + } + + @Test(expected = IllegalArgumentException.class) + public void testFinalDestinationMagicNoChild() { + finalDestination(l(MAGIC)); + } + + @Test + public void testFinalDestinationBaseDirectChild() { + finalDestination(l(MAGIC, BASE, "3.txt")); + } + + @Test(expected = IllegalArgumentException.class) + public void testFinalDestinationBaseNoChild() { + assertEquals(l(), finalDestination(l(MAGIC, BASE))); + } + + @Test + public void testFinalDestinationBaseSubdirsChild() { + assertEquals(l("2", "3.txt"), + finalDestination(l(MAGIC, "4", BASE, "2", "3.txt"))); + } + + /** + * If the base is above the magic dir, it's ignored. + */ + @Test + public void testFinalDestinationIgnoresBaseBeforeMagic() { + assertEquals(l(BASE, "home", "3.txt"), + finalDestination(l(BASE, "home", MAGIC, "2", "3.txt"))); + } + + /** varargs to array. */ + private static String[] a(String... str) { + return str; + } + + /** list to array. */ + private static List<String> l(String... str) { + return Arrays.asList(str); + } + + /** + * Varags to list. + * @param args arguments + * @return a list + */ + private static List<String> list(String... args) { + return Lists.newArrayList(args); + } + + public void assertParents(String[] expected, List<String> elements) { + assertListEquals(expected, magicPathParents(elements)); + } + + public void assertChildren(String[] expected, List<String> elements) { + assertListEquals(expected, magicPathChildren(elements)); + } + + private void assertPathSplits(String pathString, String[] expected) { + Path path = new Path(pathString); + assertArrayEquals("From path " + path, expected, + splitPathToElements(path).toArray()); + } + + private void assertListEquals(String[] expected, List<String> actual) { + assertArrayEquals(expected, actual.toArray()); + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java new file mode 100644 index 0000000..5e6fb82 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.test.HadoopTestBase; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test Tasks class. + */ +@RunWith(Parameterized.class) +public class TestTasks extends HadoopTestBase { + private static final Logger LOG = LoggerFactory.getLogger(TestTasks.class); + public static final int ITEM_COUNT = 16; + private static final int FAILPOINT = 8; + + private final int numThreads; + /** + * Thread pool for task execution. + */ + private ExecutorService threadPool; + private final CounterTask failingTask + = new CounterTask("failing committer", FAILPOINT, Item::commit); + + private final FailureCounter failures + = new FailureCounter("failures", 0, null); + private final CounterTask reverter + = new CounterTask("reverter", 0, Item::revert); + private final CounterTask aborter + = new CounterTask("aborter", 0, Item::abort); + + /** + * Test array for parameterized test runs: how many threads and + * to use. Threading makes some of the assertions brittle; there are + * more checks on single thread than parallel ops. + * @return a list of parameter tuples. + */ + @Parameterized.Parameters + public static Collection<Object[]> params() { + return Arrays.asList(new Object[][]{ + {0}, + {1}, + {3}, + {8}, + {16}, + }); + } + + + private List<Item> items; + + + /** + * Construct the parameterized test. + * @param numThreads number of threads + */ + public TestTasks(int numThreads) { + this.numThreads = numThreads; + } + + /** + * In a parallel test run there is more than one thread doing the execution. + * @return true if the threadpool size is >1 + */ + public boolean isParallel() { + return numThreads > 1; + } + + @Before + public void setup() { + items = IntStream.rangeClosed(1, ITEM_COUNT) + .mapToObj(i -> new Item(i, + String.format("With %d threads", numThreads))) + .collect(Collectors.toList()); + + if (numThreads > 0) { + threadPool = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(getMethodName() + "-pool-%d") + .build()); + } + + } + + @After + public void teardown() { + if (threadPool != null) { + threadPool.shutdown(); + threadPool = null; + } + } + + /** + * create the builder. + * @return pre-inited builder + */ + private Tasks.Builder<Item> builder() { + return Tasks.foreach(items).executeWith(threadPool); + } + + private void assertRun(Tasks.Builder<Item> builder, + CounterTask task) throws IOException { + boolean b = builder.run(task); + assertTrue("Run of " + task + " failed", b); + } + + private void assertFailed(Tasks.Builder<Item> builder, + CounterTask task) throws IOException { + boolean b = builder.run(task); + assertFalse("Run of " + task + " unexpectedly succeeded", b); + } + + private String itemsToString() { + return "[" + items.stream().map(Item::toString) + .collect(Collectors.joining("\n")) +"]"; + } + + @Test + public void testSimpleInvocation() throws Throwable { + CounterTask t = new CounterTask("simple", 0, Item::commit); + assertRun(builder(), t); + t.assertInvoked("", ITEM_COUNT); + } + + @Test + public void testFailNoStoppingSuppressed() throws Throwable { + assertFailed(builder().suppressExceptions(), failingTask); + failingTask.assertInvoked("Continued through operations", ITEM_COUNT); + items.forEach(Item::assertCommittedOrFailed); + } + + @Test + public void testFailFastSuppressed() throws Throwable { + assertFailed(builder() + .suppressExceptions() + .stopOnFailure(), + failingTask); + if (isParallel()) { + failingTask.assertInvokedAtLeast("stop fast", FAILPOINT); + } else { + failingTask.assertInvoked("stop fast", FAILPOINT); + } + } + + @Test + public void testFailedCallAbortSuppressed() throws Throwable { + assertFailed(builder() + .stopOnFailure() + .suppressExceptions() + .abortWith(aborter), + failingTask); + failingTask.assertInvokedAtLeast("success", FAILPOINT); + if (!isParallel()) { + aborter.assertInvokedAtLeast("abort", 1); + // all uncommitted items were aborted + items.stream().filter(i -> !i.committed) + .map(Item::assertAborted); + items.stream().filter(i -> i.committed) + .forEach(i -> assertFalse(i.toString(), i.aborted)); + } + } + + @Test + public void testFailedCalledWhenNotStoppingSuppressed() throws Throwable { + assertFailed(builder() + .suppressExceptions() + .onFailure(failures), + failingTask); + failingTask.assertInvokedAtLeast("success", FAILPOINT); + // only one failure was triggered + failures.assertInvoked("failure event", 1); + } + + @Test + public void testFailFastCallRevertSuppressed() throws Throwable { + assertFailed(builder() + .stopOnFailure() + .revertWith(reverter) + .abortWith(aborter) + .suppressExceptions() + .onFailure(failures), + failingTask); + failingTask.assertInvokedAtLeast("success", FAILPOINT); + if (!isParallel()) { + aborter.assertInvokedAtLeast("abort", 1); + // all uncommitted items were aborted + items.stream().filter(i -> !i.committed) + .filter(i -> !i.failed) + .forEach(Item::assertAborted); + } + // all committed were reverted + items.stream().filter(i -> i.committed && !i.failed) + .forEach(Item::assertReverted); + // all reverted items are committed + items.stream().filter(i -> i.reverted) + .forEach(Item::assertCommitted); + + // only one failure was triggered + failures.assertInvoked("failure event", 1); + } + + @Test + public void testFailSlowCallRevertSuppressed() throws Throwable { + assertFailed(builder() + .suppressExceptions() + .revertWith(reverter) + .onFailure(failures), + failingTask); + failingTask.assertInvokedAtLeast("success", FAILPOINT); + // all committed were reverted + // identify which task failed from the set + int failing = failures.getItem().id; + items.stream() + .filter(i -> i.id != failing) + .filter(i -> i.committed) + .forEach(Item::assertReverted); + // all reverted items are committed + items.stream().filter(i -> i.reverted) + .forEach(Item::assertCommitted); + + // only one failure was triggered + failures.assertInvoked("failure event", 1); + } + + @Test + public void testFailFastExceptions() throws Throwable { + intercept(IOException.class, + () -> builder() + .stopOnFailure() + .run(failingTask)); + if (isParallel()) { + failingTask.assertInvokedAtLeast("stop fast", FAILPOINT); + } else { + failingTask.assertInvoked("stop fast", FAILPOINT); + } + } + + @Test + public void testFailSlowExceptions() throws Throwable { + intercept(IOException.class, + () -> builder() + .run(failingTask)); + failingTask.assertInvoked("continued through operations", ITEM_COUNT); + items.forEach(Item::assertCommittedOrFailed); + } + + @Test + public void testFailFastExceptionsWithAbortFailure() throws Throwable { + CounterTask failFirst = new CounterTask("task", 1, Item::commit); + CounterTask a = new CounterTask("aborter", 1, Item::abort); + intercept(IOException.class, + () -> builder() + .stopOnFailure() + .abortWith(a) + .run(failFirst)); + if (!isParallel()) { + // expect the other tasks to be aborted + a.assertInvokedAtLeast("abort", ITEM_COUNT - 1); + } + } + + @Test + public void testFailFastExceptionsWithAbortFailureStopped() throws Throwable { + CounterTask failFirst = new CounterTask("task", 1, Item::commit); + CounterTask a = new CounterTask("aborter", 1, Item::abort); + intercept(IOException.class, + () -> builder() + .stopOnFailure() + .stopAbortsOnFailure() + .abortWith(a) + .run(failFirst)); + if (!isParallel()) { + // expect the other tasks to be aborted + a.assertInvoked("abort", 1); + } + } + + /** + * Fail the last one committed, all the rest will be reverted. + * The actual ID of the last task has to be picke dup from the + * failure callback, as in the pool it may be one of any. + */ + @Test + public void testRevertAllSuppressed() throws Throwable { + CounterTask failLast = new CounterTask("task", ITEM_COUNT, Item::commit); + + assertFailed(builder() + .suppressExceptions() + .stopOnFailure() + .revertWith(reverter) + .abortWith(aborter) + .onFailure(failures), + failLast); + failLast.assertInvoked("success", ITEM_COUNT); + int abCount = aborter.getCount(); + int revCount = reverter.getCount(); + assertEquals(ITEM_COUNT, 1 + abCount + revCount); + // identify which task failed from the set + int failing = failures.getItem().id; + // all committed were reverted + items.stream() + .filter(i -> i.id != failing) + .filter(i -> i.committed) + .forEach(Item::assertReverted); + items.stream() + .filter(i -> i.id != failing) + .filter(i -> !i.committed) + .forEach(Item::assertAborted); + // all reverted items are committed + items.stream().filter(i -> i.reverted) + .forEach(Item::assertCommitted); + + // only one failure was triggered + failures.assertInvoked("failure event", 1); + } + + + /** + * The Item which tasks process. + */ + private final class Item { + private final int id; + private final String text; + + private volatile boolean committed, aborted, reverted, failed; + + private Item(int item, String text) { + this.id = item; + this.text = text; + } + + boolean commit() { + committed = true; + return true; + } + + boolean abort() { + aborted = true; + return true; + } + + boolean revert() { + reverted = true; + return true; + } + + boolean fail() { + failed = true; + return true; + } + + public Item assertCommitted() { + assertTrue(toString() + " was not committed in\n" + + itemsToString(), + committed); + return this; + } + + public Item assertCommittedOrFailed() { + assertTrue(toString() + " was not committed nor failed in\n" + + itemsToString(), + committed || failed); + return this; + } + + public Item assertAborted() { + assertTrue(toString() + " was not aborted in\n" + + itemsToString(), + aborted); + return this; + } + + public Item assertReverted() { + assertTrue(toString() + " was not reverted in\n" + + itemsToString(), + reverted); + return this; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("Item{"); + sb.append(String.format("[%02d]", id)); + sb.append(", committed=").append(committed); + sb.append(", aborted=").append(aborted); + sb.append(", reverted=").append(reverted); + sb.append(", failed=").append(failed); + sb.append(", text=").append(text); + sb.append('}'); + return sb.toString(); + } + } + + /** + * Class which can count invocations and, if limit > 0, will raise + * an exception on the specific invocation of {@link #note(Object)} + * whose count == limit. + */ + private class BaseCounter { + private final AtomicInteger counter = new AtomicInteger(0); + private final int limit; + private final String name; + private Item item; + private final Optional<Function<Item, Boolean>> action; + + /** + * Base counter, tracks items. + * @param name name for string/exception/logs. + * @param limit limit at which an exception is raised, 0 == never + * @param action optional action to invoke after the increment, + * before limit check + */ + BaseCounter(String name, + int limit, + Function<Item, Boolean> action) { + this.name = name; + this.limit = limit; + this.action = Optional.ofNullable(action); + } + + /** + * Apply the action to an item; log at info afterwards with both the + * before and after string values of the item. + * @param i item to process. + * @throws IOException failure in the action + */ + void process(Item i) throws IOException { + this.item = i; + int count = counter.incrementAndGet(); + if (limit == count) { + i.fail(); + LOG.info("{}: Failed {}", this, i); + throw new IOException(String.format("%s: Limit %d reached for %s", + this, limit, i)); + } + String before = i.toString(); + action.map(a -> a.apply(i)); + LOG.info("{}: {} -> {}", this, before, i); + } + + int getCount() { + return counter.get(); + } + + Item getItem() { + return item; + } + + void assertInvoked(String text, int expected) { + assertEquals(toString() + ": " + text, expected, getCount()); + } + + void assertInvokedAtLeast(String text, int expected) { + int actual = getCount(); + assertTrue(toString() + ": " + text + + "-expected " + expected + + " invocations, but got " + actual + + " in " + itemsToString(), + expected <= actual); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "BaseCounter{"); + sb.append("name='").append(name).append('\''); + sb.append(", count=").append(counter.get()); + sb.append(", limit=").append(limit); + sb.append(", item=").append(item); + sb.append('}'); + return sb.toString(); + } + } + + private final class CounterTask + extends BaseCounter implements Tasks.Task<Item, IOException> { + + private CounterTask(String name, int limit, + Function<Item, Boolean> action) { + super(name, limit, action); + } + + @Override + public void run(Item item) throws IOException { + process(item); + } + + } + + private final class FailureCounter + extends BaseCounter implements Tasks.FailureTask<Item, IOException> { + private Exception exception; + + private FailureCounter(String name, int limit, + Function<Item, Boolean> action) { + super(name, limit, action); + } + + @Override + public void run(Item item, Exception ex) throws IOException { + process(item); + this.exception = ex; + } + + private Exception getException() { + return exception; + } + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java new file mode 100644 index 0000000..57eb8b2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; +import org.apache.hadoop.fs.s3a.commit.files.SuccessData; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; + +/** + * Full integration test for the Magic Committer. + * + * There's no need to disable the committer setting for the filesystem here, + * because the committers are being instantiated in their own processes; + * the settings in {@link #applyCustomConfigOptions(Configuration)} are + * passed down to these processes. + */ +public class ITMagicCommitMRJob extends AbstractITCommitMRJob { + + /** + * Need consistency here. + * @return false + */ + @Override + public boolean useInconsistentClient() { + return false; + } + + @Override + protected String committerName() { + return MagicS3GuardCommitter.NAME; + } + + /** + * Turn on the magic commit support for the FS, else nothing will work. + * @param conf configuration + */ + @Override + protected void applyCustomConfigOptions(Configuration conf) { + conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); + } + + /** + * Check that the magic dir was cleaned up. + * {@inheritDoc} + */ + @Override + protected void customPostExecutionValidation(Path destPath, + SuccessData successData) throws Exception { + assertPathDoesNotExist("No cleanup", new Path(destPath, MAGIC)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java new file mode 100644 index 0000000..74c1d9d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.commit.AbstractITCommitProtocol; +import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.CommitUtils; +import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection; +import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; + +/** + * Test the magic committer's commit protocol. + */ +public class ITestMagicCommitProtocol extends AbstractITCommitProtocol { + + @Override + protected String suitename() { + return "ITestMagicCommitProtocol"; + } + + /** + * Need consistency here. + * @return false + */ + @Override + public boolean useInconsistentClient() { + return false; + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); + return conf; + } + + @Override + protected String getCommitterFactoryName() { + return CommitConstants.S3A_COMMITTER_FACTORY; + } + + @Override + protected String getCommitterName() { + return CommitConstants.COMMITTER_NAME_MAGIC; + } + + @Override + public void setup() throws Exception { + super.setup(); + CommitUtils.verifyIsMagicCommitFS(getFileSystem()); + } + + @Override + public void assertJobAbortCleanedUp(JobData jobData) + throws Exception { + // special handling of magic directory; harmless in staging + Path magicDir = new Path(getOutDir(), MAGIC); + ContractTestUtils.assertPathDoesNotExist(getFileSystem(), + "magic dir ", magicDir); + super.assertJobAbortCleanedUp(jobData); + } + + @Override + protected AbstractS3ACommitter createCommitter( + Path outputPath, + TaskAttemptContext context) + throws IOException { + return new MagicS3GuardCommitter(outputPath, context); + } + + public AbstractS3ACommitter createFailingCommitter( + TaskAttemptContext tContext) throws IOException { + return new CommitterWithFailedThenSucceed(getOutDir(), tContext); + } + + protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException { + String pathStr = p.toString(); + assertTrue("not magic " + pathStr, + pathStr.contains(MAGIC)); + assertPathDoesNotExist("task attempt visible", p); + } + + protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException { + FileStatus st = getFileSystem().getFileStatus(p); + assertEquals("file length in " + st, 0, st.getLen()); + Path pendingFile = new Path(p.toString() + PENDING_SUFFIX); + assertPathExists("pending file", pendingFile); + } + + /** + * The class provides a overridden implementation of commitJobInternal which + * causes the commit failed for the first time then succeed. + */ + + private static final class CommitterWithFailedThenSucceed extends + MagicS3GuardCommitter implements CommitterFaultInjection { + private final CommitterFaultInjectionImpl injection; + + CommitterWithFailedThenSucceed(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + injection = new CommitterFaultInjectionImpl(outputPath, context, true); + } + + @Override + public void setupJob(JobContext context) throws IOException { + injection.setupJob(context); + super.setupJob(context); + } + + @Override + public void abortJob(JobContext context, JobStatus.State state) + throws IOException { + injection.abortJob(context, state); + super.abortJob(context, state); + } + + @Override + @SuppressWarnings("deprecation") + public void cleanupJob(JobContext context) throws IOException { + injection.cleanupJob(context); + super.cleanupJob(context); + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + injection.setupTask(context); + super.setupTask(context); + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + injection.commitTask(context); + super.commitTask(context); + } + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + injection.abortTask(context); + super.abortTask(context); + } + + @Override + public void commitJob(JobContext context) throws IOException { + injection.commitJob(context); + super.commitJob(context); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) + throws IOException { + injection.needsTaskCommit(context); + return super.needsTaskCommit(context); + } + + @Override + public void setFaults(CommitterFaultInjection.Faults... faults) { + injection.setFaults(faults); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java new file mode 100644 index 0000000..e3a295b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.CommitOperations; +import org.apache.hadoop.fs.s3a.commit.CommitUtils; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; + + +/** + * Write a huge file via the magic commit mechanism, + * commit it and verify that it is there. This is needed to + * verify that the pending-upload mechanism works with multipart files + * of more than one part. + * + * This is a scale test. + */ +public class ITestS3AHugeMagicCommits extends AbstractSTestS3AHugeFiles { + private static final Logger LOG = LoggerFactory.getLogger( + ITestS3AHugeMagicCommits.class); + + private Path magicDir; + private Path jobDir; + + /** file used as the destination for the write; + * it is never actually created. */ + private Path magicOutputFile; + + /** The file with the JSON data about the commit. */ + private Path pendingDataFile; + + /** + * Use fast upload on disk. + * @return the upload buffer mechanism. + */ + protected String getBlockOutputBufferName() { + return Constants.FAST_UPLOAD_BUFFER_DISK; + } + + /** + * The suite name; required to be unique. + * @return the test suite name + */ + @Override + public String getTestSuiteName() { + return "ITestS3AHugeMagicCommits"; + } + + /** + * Create the scale IO conf with the committer enabled. + * @return the configuration to use for the test FS. + */ + @Override + protected Configuration createScaleConfiguration() { + Configuration conf = super.createScaleConfiguration(); + conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + CommitUtils.verifyIsMagicCommitFS(getFileSystem()); + + // set up the paths for the commit operation + Path finalDirectory = new Path(getScaleTestDir(), "commit"); + magicDir = new Path(finalDirectory, MAGIC); + jobDir = new Path(magicDir, "job_001"); + String filename = "commit.bin"; + setHugefile(new Path(finalDirectory, filename)); + magicOutputFile = new Path(jobDir, filename); + pendingDataFile = new Path(jobDir, filename + PENDING_SUFFIX); + } + + /** + * Returns the path to the commit metadata file, not that of the huge file. + * @return a file in the job dir + */ + @Override + protected Path getPathOfFileToCreate() { + return magicOutputFile; + } + + @Override + public void test_030_postCreationAssertions() throws Throwable { + describe("Committing file"); + assertPathDoesNotExist("final file exists", getHugefile()); + assertPathExists("No pending file", pendingDataFile); + S3AFileSystem fs = getFileSystem(); + + // as a 0-byte marker is created, there is a file at the end path, + // it just MUST be 0-bytes long + FileStatus status = fs.getFileStatus(magicOutputFile); + assertEquals("Non empty marker file " + status, + 0, status.getLen()); + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + CommitOperations operations = new CommitOperations(fs); + Path destDir = getHugefile().getParent(); + assertPathExists("Magic dir", new Path(destDir, CommitConstants.MAGIC)); + String destDirKey = fs.pathToKey(destDir); + List<String> uploads = listMultipartUploads(fs, destDirKey); + + assertEquals("Pending uploads: " + + uploads.stream() + .collect(Collectors.joining("\n")), 1, uploads.size()); + assertNotNull("jobDir", jobDir); + Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>> + results = operations.loadSinglePendingCommits(jobDir, false); + for (SinglePendingCommit singlePendingCommit : + results.getKey().getCommits()) { + operations.commitOrFail(singlePendingCommit); + } + timer.end("time to commit %s", pendingDataFile); + // upload is no longer pending + uploads = listMultipartUploads(fs, destDirKey); + assertEquals("Pending uploads" + + uploads.stream().collect(Collectors.joining("\n")), + 0, operations.listPendingUploadsUnderPath(destDir).size()); + // at this point, the huge file exists, so the normal assertions + // on that file must be valid. Verify. + super.test_030_postCreationAssertions(); + } + + private void skipQuietly(String text) { + describe("Skipping: %s", text); + } + + @Override + public void test_040_PositionedReadHugeFile() { + skipQuietly("test_040_PositionedReadHugeFile"); + } + + @Override + public void test_050_readHugeFile() { + skipQuietly("readHugeFile"); + } + + @Override + public void test_100_renameHugeFile() { + skipQuietly("renameHugeFile"); + } + + @Override + public void test_800_DeleteHugeFiles() throws IOException { + if (getFileSystem() != null) { + try { + getFileSystem().abortOutstandingMultipartUploads(0); + } catch (IOException e) { + LOG.info("Exception while purging old uploads", e); + } + } + try { + super.test_800_DeleteHugeFiles(); + } finally { + ContractTestUtils.rm(getFileSystem(), magicDir, true, false); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/MockedStagingCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/MockedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/MockedStagingCommitter.java new file mode 100644 index 0000000..47383b7 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/MockedStagingCommitter.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.MockS3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.ClientErrors; +import org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.ClientResults; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Committer subclass that uses a mocked S3A connection for testing. + */ +class MockedStagingCommitter extends StagingCommitter { + + MockedStagingCommitter(Path outputPath, + TaskAttemptContext context) + throws IOException { + super(outputPath, context); + } + + /** + * Returns the mock FS without checking FS type. + * @param out output path + * @param config job/task config + * @return a filesystem. + * @throws IOException IO failure + */ + @Override + protected FileSystem getDestinationFS(Path out, Configuration config) + throws IOException { + return out.getFileSystem(config); + } + + @Override + public void commitJob(JobContext context) throws IOException { + // turn off stamping an output marker, as that codepath isn't mocked yet. + super.commitJob(context); + Configuration conf = context.getConfiguration(); + try { + String jobCommitterPath = conf.get("mock-results-file"); + if (jobCommitterPath != null) { + try (ObjectOutputStream out = new ObjectOutputStream( + FileSystem.getLocal(conf) + .create(new Path(jobCommitterPath), false))) { + out.writeObject(getResults()); + } + } + } catch (Exception e) { + // do nothing, the test will fail + } + } + + @Override + protected void maybeCreateSuccessMarker(JobContext context, + List<String> filenames) + throws IOException { + //skipped + } + + public ClientResults getResults() throws IOException { + MockS3AFileSystem mockFS = (MockS3AFileSystem)getDestS3AFS(); + return mockFS.getOutcome().getKey(); + } + + public ClientErrors getErrors() throws IOException { + MockS3AFileSystem mockFS = (MockS3AFileSystem) getDestS3AFS(); + return mockFS.getOutcome().getValue(); + } + + @Override + public String toString() { + return "MockedStagingCommitter{ " + super.toString() + " "; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedCommitterForTesting.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedCommitterForTesting.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedCommitterForTesting.java new file mode 100644 index 0000000..5adce15 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedCommitterForTesting.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Partitioned committer overridden for better testing. + */ +class PartitionedCommitterForTesting extends + PartitionedStagingCommitter { + + PartitionedCommitterForTesting(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + @Override + protected void initOutput(Path out) throws IOException { + super.initOutput(out); + setOutputPath(out); + } + + /** + * Returns the mock FS without checking FS type. + * @param out output path + * @param config job/task config + * @return a filesystem. + * @throws IOException failure to get the FS + */ + @Override + protected FileSystem getDestinationFS(Path out, Configuration config) + throws IOException { + return out.getFileSystem(config); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java new file mode 100644 index 0000000..38d5156 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java @@ -0,0 +1,724 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.DeleteObjectRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; +import com.amazonaws.services.s3.model.MultipartUpload; +import com.amazonaws.services.s3.model.MultipartUploadListing; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.MockS3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.AbstractCommitITest; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; +import org.apache.hadoop.fs.s3a.commit.MiniDFSClusterService; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.test.HadoopTestBase; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +/** + * Test base for mock tests of staging committers: + * core constants and static methods, inner classes + * for specific test types. + * + * Some of the verification methods here are unused...they are being left + * in place in case changes on the implementation make the verifications + * relevant again. + */ +public class StagingTestBase { + private static final Logger LOG = + LoggerFactory.getLogger(StagingTestBase.class); + + public static final String BUCKET = MockS3AFileSystem.BUCKET; + public static final String OUTPUT_PREFIX = "output/path"; + public static final Path OUTPUT_PATH = + new Path("s3a://" + BUCKET + "/" + OUTPUT_PREFIX); + public static final URI OUTPUT_PATH_URI = OUTPUT_PATH.toUri(); + + protected StagingTestBase() { + } + + /** + * Sets up the mock filesystem instance and binds it to the + * {@link FileSystem#get(URI, Configuration)} call for the supplied URI + * and config. + * All standard mocking setup MUST go here. + * @param conf config to use + * @param outcome tuple of outcomes to store in mock FS + * @return the filesystem created + * @throws IOException IO problems. + */ + protected static S3AFileSystem createAndBindMockFSInstance(Configuration conf, + Pair<StagingTestBase.ClientResults, StagingTestBase.ClientErrors> outcome) + throws IOException { + S3AFileSystem mockFs = mockS3AFileSystemRobustly(); + MockS3AFileSystem wrapperFS = new MockS3AFileSystem(mockFs, outcome); + URI uri = OUTPUT_PATH_URI; + wrapperFS.initialize(uri, conf); + FileSystemTestHelper.addFileSystemForTesting(uri, conf, wrapperFS); + return mockFs; + } + + private static S3AFileSystem mockS3AFileSystemRobustly() { + S3AFileSystem mockFS = mock(S3AFileSystem.class); + doNothing().when(mockFS).incrementReadOperations(); + doNothing().when(mockFS).incrementWriteOperations(); + doNothing().when(mockFS).incrementWriteOperations(); + doNothing().when(mockFS).incrementWriteOperations(); + return mockFS; + } + + /** + * Look up the FS by URI, return a (cast) Mock wrapper. + * @param conf config + * @return the FS + * @throws IOException IO Failure + */ + public static MockS3AFileSystem lookupWrapperFS(Configuration conf) + throws IOException { + return (MockS3AFileSystem) FileSystem.get(OUTPUT_PATH_URI, conf); + } + + public static void verifyCompletion(FileSystem mockS3) throws IOException { + verifyCleanupTempFiles(mockS3); + verifyNoMoreInteractions(mockS3); + } + + public static void verifyDeleted(FileSystem mockS3, Path path) + throws IOException { + verify(mockS3).delete(path, true); + } + + public static void verifyDeleted(FileSystem mockS3, String child) + throws IOException { + verifyDeleted(mockS3, new Path(OUTPUT_PATH, child)); + } + + public static void verifyCleanupTempFiles(FileSystem mockS3) + throws IOException { + verifyDeleted(mockS3, + new Path(OUTPUT_PATH, CommitConstants.TEMPORARY)); + } + + protected static void assertConflictResolution( + StagingCommitter committer, + JobContext job, + ConflictResolution mode) { + Assert.assertEquals("Conflict resolution mode in " + committer, + mode, committer.getConflictResolutionMode(job, new Configuration())); + } + + public static void pathsExist(FileSystem mockS3, String... children) + throws IOException { + for (String child : children) { + pathExists(mockS3, new Path(OUTPUT_PATH, child)); + } + } + + public static void pathExists(FileSystem mockS3, Path path) + throws IOException { + when(mockS3.exists(path)).thenReturn(true); + } + + public static void pathDoesNotExist(FileSystem mockS3, Path path) + throws IOException { + when(mockS3.exists(path)).thenReturn(false); + } + + public static void canDelete(FileSystem mockS3, String... children) + throws IOException { + for (String child : children) { + canDelete(mockS3, new Path(OUTPUT_PATH, child)); + } + } + + public static void canDelete(FileSystem mockS3, Path f) throws IOException { + when(mockS3.delete(f, + true /* recursive */)) + .thenReturn(true); + } + + public static void verifyExistenceChecked(FileSystem mockS3, String child) + throws IOException { + verifyExistenceChecked(mockS3, new Path(OUTPUT_PATH, child)); + } + + public static void verifyExistenceChecked(FileSystem mockS3, Path path) + throws IOException { + verify(mockS3).exists(path); + } + + /** + * Provides setup/teardown of a MiniDFSCluster for tests that need one. + */ + public static class MiniDFSTest extends HadoopTestBase { + + private static MiniDFSClusterService hdfs; + + private static JobConf conf = null; + + protected static JobConf getConfiguration() { + return conf; + } + + protected static FileSystem getDFS() { + return hdfs.getClusterFS(); + } + + /** + * Setup the mini HDFS cluster. + * @throws IOException Failure + */ + @BeforeClass + @SuppressWarnings("deprecation") + public static void setupHDFS() throws IOException { + if (hdfs == null) { + JobConf c = new JobConf(); + hdfs = new MiniDFSClusterService(); + hdfs.init(c); + hdfs.start(); + conf = c; + } + } + + @SuppressWarnings("ThrowableNotThrown") + @AfterClass + public static void teardownFS() throws IOException { + ServiceOperations.stopQuietly(hdfs); + conf = null; + hdfs = null; + } + + } + + /** + * Base class for job committer tests. + * @param <C> committer + */ + public abstract static class JobCommitterTest<C extends OutputCommitter> + extends HadoopTestBase { + private static final JobID JOB_ID = new JobID("job", 1); + private JobConf jobConf; + + // created in BeforeClass + private S3AFileSystem mockFS = null; + private MockS3AFileSystem wrapperFS = null; + private JobContext job = null; + + // created in Before + private StagingTestBase.ClientResults results = null; + private StagingTestBase.ClientErrors errors = null; + private AmazonS3 mockClient = null; + + @Before + public void setupJob() throws Exception { + this.jobConf = new JobConf(); + jobConf.set(InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, + UUID.randomUUID().toString()); + jobConf.setBoolean( + CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, + false); + + this.job = new JobContextImpl(jobConf, JOB_ID); + this.results = new StagingTestBase.ClientResults(); + this.errors = new StagingTestBase.ClientErrors(); + this.mockClient = newMockS3Client(results, errors); + this.mockFS = createAndBindMockFSInstance(jobConf, + Pair.of(results, errors)); + this.wrapperFS = lookupWrapperFS(jobConf); + // and bind the FS + wrapperFS.setAmazonS3Client(mockClient); + } + + public S3AFileSystem getMockS3A() { + return mockFS; + } + + public MockS3AFileSystem getWrapperFS() { + return wrapperFS; + } + + public JobContext getJob() { + return job; + } + + /** + * Create a task attempt for a job by creating a stub task ID. + * @return a task attempt + */ + public TaskAttemptContext createTaskAttemptForJob() { + return AbstractCommitITest.taskAttemptForJob( + MRBuilderUtils.newJobId(1, JOB_ID.getId(), 1), job); + } + + protected StagingTestBase.ClientResults getMockResults() { + return results; + } + + protected StagingTestBase.ClientErrors getMockErrors() { + return errors; + } + + abstract C newJobCommitter() throws Exception; + } + + /** Abstract test of task commits. */ + public abstract static class TaskCommitterTest<C extends OutputCommitter> + extends JobCommitterTest<C> { + private static final TaskAttemptID AID = new TaskAttemptID( + new TaskID(JobCommitterTest.JOB_ID, TaskType.REDUCE, 2), 3); + + private C jobCommitter = null; + private TaskAttemptContext tac = null; + private File tempDir; + + @Before + public void setupTask() throws Exception { + this.jobCommitter = newJobCommitter(); + jobCommitter.setupJob(getJob()); + + this.tac = new TaskAttemptContextImpl( + new Configuration(getJob().getConfiguration()), AID); + + // get the task's configuration copy so modifications take effect + String tmp = System.getProperty( + StagingCommitterConstants.JAVA_IO_TMPDIR); + tempDir = new File(tmp); + tac.getConfiguration().set(Constants.BUFFER_DIR, tmp + "/buffer"); + tac.getConfiguration().set( + CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH, + tmp + "/cluster"); + } + + protected C getJobCommitter() { + return jobCommitter; + } + + protected TaskAttemptContext getTAC() { + return tac; + } + + abstract C newTaskCommitter() throws Exception; + + protected File getTempDir() { + return tempDir; + } + } + + /** + * Results accrued during mock runs. + * This data is serialized in MR Tests and read back in in the test runner + */ + public static class ClientResults implements Serializable { + private static final long serialVersionUID = -3137637327090709905L; + // For inspection of what the committer did + private final Map<String, InitiateMultipartUploadRequest> requests = + Maps.newHashMap(); + private final List<String> uploads = Lists.newArrayList(); + private final List<UploadPartRequest> parts = Lists.newArrayList(); + private final Map<String, List<String>> tagsByUpload = Maps.newHashMap(); + private final List<CompleteMultipartUploadRequest> commits = + Lists.newArrayList(); + private final List<AbortMultipartUploadRequest> aborts + = Lists.newArrayList(); + private final Map<String, String> activeUploads = + Maps.newHashMap(); + private final List<DeleteObjectRequest> deletes = Lists.newArrayList(); + + public Map<String, InitiateMultipartUploadRequest> getRequests() { + return requests; + } + + public List<String> getUploads() { + return uploads; + } + + public List<UploadPartRequest> getParts() { + return parts; + } + + public Map<String, List<String>> getTagsByUpload() { + return tagsByUpload; + } + + public List<CompleteMultipartUploadRequest> getCommits() { + return commits; + } + + public List<AbortMultipartUploadRequest> getAborts() { + return aborts; + } + + public List<DeleteObjectRequest> getDeletes() { + return deletes; + } + + public void resetDeletes() { + deletes.clear(); + } + + public void resetUploads() { + uploads.clear(); + activeUploads.clear(); + } + + public void resetCommits() { + commits.clear(); + } + + public void resetRequests() { + requests.clear(); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + super.toString()); + sb.append("{ requests=").append(requests.size()); + sb.append(", uploads=").append(uploads.size()); + sb.append(", parts=").append(parts.size()); + sb.append(", tagsByUpload=").append(tagsByUpload.size()); + sb.append(", commits=").append(commits.size()); + sb.append(", aborts=").append(aborts.size()); + sb.append(", deletes=").append(deletes.size()); + sb.append('}'); + return sb.toString(); + } + } + + /** Control errors to raise in mock S3 client. */ + public static class ClientErrors { + // For injecting errors + private int failOnInit = -1; + private int failOnUpload = -1; + private int failOnCommit = -1; + private int failOnAbort = -1; + private boolean recover = false; + + public void failOnInit(int initNum) { + this.failOnInit = initNum; + } + + public void failOnUpload(int uploadNum) { + this.failOnUpload = uploadNum; + } + + public void failOnCommit(int commitNum) { + this.failOnCommit = commitNum; + } + + public void failOnAbort(int abortNum) { + this.failOnAbort = abortNum; + } + + public void recoverAfterFailure() { + this.recover = true; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "ClientErrors{"); + sb.append("failOnInit=").append(failOnInit); + sb.append(", failOnUpload=").append(failOnUpload); + sb.append(", failOnCommit=").append(failOnCommit); + sb.append(", failOnAbort=").append(failOnAbort); + sb.append(", recover=").append(recover); + sb.append('}'); + return sb.toString(); + } + + public int getFailOnInit() { + return failOnInit; + } + + public int getFailOnUpload() { + return failOnUpload; + } + + public int getFailOnCommit() { + return failOnCommit; + } + + public int getFailOnAbort() { + return failOnAbort; + } + + public boolean isRecover() { + return recover; + } + } + + /** + * Instantiate mock client with the results and errors requested. + * @param results results to accrue + * @param errors when (if any) to fail + * @return the mock client to patch in to a committer/FS instance + */ + public static AmazonS3 newMockS3Client(final ClientResults results, + final ClientErrors errors) { + AmazonS3Client mockClient = mock(AmazonS3Client.class); + final Object lock = new Object(); + + // initiateMultipartUpload + when(mockClient + .initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))) + .thenAnswer(invocation -> { + LOG.debug("initiateMultipartUpload for {}", mockClient); + synchronized (lock) { + if (results.requests.size() == errors.failOnInit) { + if (errors.recover) { + errors.failOnInit(-1); + } + throw new AmazonClientException( + "Mock Fail on init " + results.requests.size()); + } + String uploadId = UUID.randomUUID().toString(); + InitiateMultipartUploadRequest req = invocation.getArgumentAt( + 0, InitiateMultipartUploadRequest.class); + results.requests.put(uploadId, req); + results.activeUploads.put(uploadId, req.getKey()); + results.uploads.add(uploadId); + return newResult(results.requests.get(uploadId), uploadId); + } + }); + + // uploadPart + when(mockClient.uploadPart(any(UploadPartRequest.class))) + .thenAnswer(invocation -> { + LOG.debug("uploadPart for {}", mockClient); + synchronized (lock) { + if (results.parts.size() == errors.failOnUpload) { + if (errors.recover) { + errors.failOnUpload(-1); + } + LOG.info("Triggering upload failure"); + throw new AmazonClientException( + "Mock Fail on upload " + results.parts.size()); + } + UploadPartRequest req = invocation.getArgumentAt( + 0, UploadPartRequest.class); + results.parts.add(req); + String etag = UUID.randomUUID().toString(); + List<String> etags = results.tagsByUpload.get(req.getUploadId()); + if (etags == null) { + etags = Lists.newArrayList(); + results.tagsByUpload.put(req.getUploadId(), etags); + } + etags.add(etag); + return newResult(req, etag); + } + }); + + // completeMultipartUpload + when(mockClient + .completeMultipartUpload(any(CompleteMultipartUploadRequest.class))) + .thenAnswer(invocation -> { + LOG.debug("completeMultipartUpload for {}", mockClient); + synchronized (lock) { + if (results.commits.size() == errors.failOnCommit) { + if (errors.recover) { + errors.failOnCommit(-1); + } + throw new AmazonClientException( + "Mock Fail on commit " + results.commits.size()); + } + CompleteMultipartUploadRequest req = invocation.getArgumentAt( + 0, CompleteMultipartUploadRequest.class); + results.commits.add(req); + results.activeUploads.remove(req.getUploadId()); + + return newResult(req); + } + }); + + // abortMultipartUpload mocking + doAnswer(invocation -> { + LOG.debug("abortMultipartUpload for {}", mockClient); + synchronized (lock) { + if (results.aborts.size() == errors.failOnAbort) { + if (errors.recover) { + errors.failOnAbort(-1); + } + throw new AmazonClientException( + "Mock Fail on abort " + results.aborts.size()); + } + AbortMultipartUploadRequest req = invocation.getArgumentAt( + 0, AbortMultipartUploadRequest.class); + String id = req.getUploadId(); + String p = results.activeUploads.remove(id); + if (p == null) { + // upload doesn't exist + AmazonS3Exception ex = new AmazonS3Exception( + "not found " + id); + ex.setStatusCode(404); + throw ex; + } + results.aborts.add(req); + return null; + } + }) + .when(mockClient) + .abortMultipartUpload(any(AbortMultipartUploadRequest.class)); + + // deleteObject mocking + doAnswer(invocation -> { + LOG.debug("deleteObject for {}", mockClient); + synchronized (lock) { + results.deletes.add(invocation.getArgumentAt( + 0, DeleteObjectRequest.class)); + return null; + } + }) + .when(mockClient) + .deleteObject(any(DeleteObjectRequest.class)); + + // deleteObject mocking + doAnswer(invocation -> { + LOG.debug("deleteObject for {}", mockClient); + synchronized (lock) { + results.deletes.add(new DeleteObjectRequest( + invocation.getArgumentAt(0, String.class), + invocation.getArgumentAt(1, String.class) + )); + return null; + } + }).when(mockClient) + .deleteObject(any(String.class), any(String.class)); + + // to String returns the debug information + when(mockClient.toString()).thenAnswer( + invocation -> "Mock3AClient " + results + " " + errors); + + when(mockClient + .listMultipartUploads(any(ListMultipartUploadsRequest.class))) + .thenAnswer(invocation -> { + synchronized (lock) { + MultipartUploadListing l = new MultipartUploadListing(); + l.setMultipartUploads( + results.activeUploads.entrySet().stream() + .map(e -> newMPU(e.getKey(), e.getValue())) + .collect(Collectors.toList())); + return l; + } + }); + + return mockClient; + } + + private static CompleteMultipartUploadResult newResult( + CompleteMultipartUploadRequest req) { + return new CompleteMultipartUploadResult(); + } + + + private static MultipartUpload newMPU(String id, String path) { + MultipartUpload up = new MultipartUpload(); + up.setUploadId(id); + up.setKey(path); + return up; + } + + private static UploadPartResult newResult(UploadPartRequest request, + String etag) { + UploadPartResult result = new UploadPartResult(); + result.setPartNumber(request.getPartNumber()); + result.setETag(etag); + return result; + } + + private static InitiateMultipartUploadResult newResult( + InitiateMultipartUploadRequest request, String uploadId) { + InitiateMultipartUploadResult result = new InitiateMultipartUploadResult(); + result.setUploadId(uploadId); + return result; + } + + /** + * create files in the attempt path that should be found by + * {@code getTaskOutput}. + * @param relativeFiles list of files relative to address path + * @param attemptPath attempt path + * @param conf config for FS + * @throws IOException on any failure + */ + public static void createTestOutputFiles(List<String> relativeFiles, + Path attemptPath, + Configuration conf) throws IOException { + // + FileSystem attemptFS = attemptPath.getFileSystem(conf); + attemptFS.delete(attemptPath, true); + for (String relative : relativeFiles) { + // 0-length files are ignored, so write at least one byte + OutputStream out = attemptFS.create(new Path(attemptPath, relative)); + out.write(34); + out.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestPaths.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestPaths.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestPaths.java new file mode 100644 index 0000000..ee6480a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestPaths.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.test.HadoopTestBase; + +import static org.apache.hadoop.fs.s3a.commit.staging.Paths.*; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test {@link org.apache.hadoop.fs.s3a.commit.staging.Paths}. + */ +public class TestPaths extends HadoopTestBase { + + @Test + public void testUUIDPart() { + assertUUIDAdded("/part-0000", "/part-0000-UUID"); + } + + @Test + public void testUUIDPartSuffix() { + assertUUIDAdded("/part-0000.gz.csv", "/part-0000-UUID.gz.csv"); + } + + @Test + public void testUUIDDottedPath() { + assertUUIDAdded("/parent.dir/part-0000", "/parent.dir/part-0000-UUID"); + } + + @Test + public void testUUIDPartUUID() { + assertUUIDAdded("/part-0000-UUID.gz.csv", "/part-0000-UUID.gz.csv"); + } + + @Test + public void testUUIDParentUUID() { + assertUUIDAdded("/UUID/part-0000.gz.csv", "/UUID/part-0000.gz.csv"); + } + + @Test + public void testUUIDDir() throws Throwable { + intercept(IllegalStateException.class, + () -> addUUID("/dest/", "UUID")); + } + + @Test + public void testUUIDEmptyDir() throws Throwable { + intercept(IllegalArgumentException.class, + () -> addUUID("", "UUID")); + } + + @Test + public void testEmptyUUID() throws Throwable { + intercept(IllegalArgumentException.class, + () -> addUUID("part-0000.gz", "")); + } + + private void assertUUIDAdded(String path, String expected) { + assertEquals("from " + path, expected, addUUID(path, "UUID")); + } + + private static final String DATA = "s3a://landsat-pds/data/"; + private static final Path BASE = new Path(DATA); + + @Test + public void testRelativizeOneLevel() { + String suffix = "year=2017"; + Path path = new Path(DATA + suffix); + assertEquals(suffix, getRelativePath(BASE, path)); + } + + @Test + public void testRelativizeTwoLevel() { + String suffix = "year=2017/month=10"; + Path path = path(BASE, suffix); + assertEquals(suffix, getRelativePath(BASE, path)); + } + + @Test + public void testRelativizeSelf() { + assertEquals("", getRelativePath(BASE, BASE)); + } + + @Test + public void testRelativizeParent() { + // goes up to the parent if one is above the other + assertEquals("/", getRelativePath(BASE, BASE.getParent())); + } + + @Test + public void testGetPartition() { + assertEquals("year=2017/month=10", + getPartition("year=2017/month=10/part-0000.avro")); + } + + @Test + public void testMPUCommitDir() throws Throwable { + Configuration conf = new Configuration(); + LocalFileSystem localFS = FileSystem.getLocal(conf); + Path dir = getMultipartUploadCommitsDirectory(localFS, conf, "UUID"); + assertTrue(dir.toString().endsWith("UUID/" + + StagingCommitterConstants.STAGING_UPLOADS)); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org