HADOOP-13345 HS3Guard: Improved Consistency for S3A. Contributed by: Chris Nauroth, Aaron Fabbri, Mingliang Liu, Lei (Eddy) Xu, Sean Mackrory, Steve Loughran and others.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/621b43e2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/621b43e2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/621b43e2 Branch: refs/heads/HDFS-10467 Commit: 621b43e254afaff708cd6fc4698b29628f6abc33 Parents: 7a96033 Author: Steve Loughran <[email protected]> Authored: Fri Sep 1 14:13:41 2017 +0100 Committer: Steve Loughran <[email protected]> Committed: Fri Sep 1 14:13:41 2017 +0100 ---------------------------------------------------------------------- .../main/resources/assemblies/hadoop-tools.xml | 13 + hadoop-common-project/hadoop-common/pom.xml | 5 + .../apache/hadoop/fs/AbstractFileSystem.java | 8 + .../java/org/apache/hadoop/fs/FileContext.java | 9 + .../src/main/resources/core-default.xml | 108 ++ .../hadoop/fs/FileSystemContractBaseTest.java | 16 +- .../fs/contract/AbstractContractRenameTest.java | 63 ++ .../org/apache/hadoop/test/LambdaTestUtils.java | 112 ++ hadoop-project/pom.xml | 19 + .../hadoop-aws/dev-support/findbugs-exclude.xml | 6 + hadoop-tools/hadoop-aws/pom.xml | 129 ++- .../org/apache/hadoop/fs/s3a/Constants.java | 133 ++- .../hadoop/fs/s3a/DefaultS3ClientFactory.java | 233 ++++ .../fs/s3a/InconsistentAmazonS3Client.java | 434 ++++++++ .../fs/s3a/InconsistentS3ClientFactory.java | 40 + .../java/org/apache/hadoop/fs/s3a/Listing.java | 263 ++++- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 17 +- .../org/apache/hadoop/fs/s3a/S3AFileStatus.java | 45 +- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 572 ++++++++-- .../hadoop/fs/s3a/S3AInstrumentation.java | 81 +- .../apache/hadoop/fs/s3a/S3AOutputStream.java | 14 +- .../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 36 +- .../apache/hadoop/fs/s3a/S3ClientFactory.java | 190 +--- .../org/apache/hadoop/fs/s3a/Statistic.java | 13 +- .../java/org/apache/hadoop/fs/s3a/Tristate.java | 32 + .../org/apache/hadoop/fs/s3a/UploadInfo.java | 43 + .../fs/s3a/s3guard/DescendantsIterator.java | 142 +++ .../fs/s3a/s3guard/DirListingMetadata.java | 322 ++++++ .../fs/s3a/s3guard/DynamoDBClientFactory.java | 132 +++ .../fs/s3a/s3guard/DynamoDBMetadataStore.java | 1010 ++++++++++++++++++ .../fs/s3a/s3guard/LocalMetadataStore.java | 435 ++++++++ .../hadoop/fs/s3a/s3guard/LruHashMap.java | 50 + .../hadoop/fs/s3a/s3guard/MetadataStore.java | 221 ++++ .../s3guard/MetadataStoreListFilesIterator.java | 169 +++ .../fs/s3a/s3guard/NullMetadataStore.java | 104 ++ .../hadoop/fs/s3a/s3guard/PathMetadata.java | 143 +++ .../PathMetadataDynamoDBTranslation.java | 304 ++++++ .../apache/hadoop/fs/s3a/s3guard/S3Guard.java | 463 ++++++++ .../hadoop/fs/s3a/s3guard/S3GuardTool.java | 924 ++++++++++++++++ .../hadoop/fs/s3a/s3guard/package-info.java | 30 + .../hadoop/fs/s3native/S3xLoginHelper.java | 4 + .../src/main/shellprofile.d/hadoop-s3guard.sh | 37 + .../src/site/markdown/tools/hadoop-aws/index.md | 3 +- .../site/markdown/tools/hadoop-aws/s3guard.md | 610 +++++++++++ .../site/markdown/tools/hadoop-aws/testing.md | 288 ++++- .../fs/contract/s3a/ITestS3AContractCreate.java | 14 + .../fs/contract/s3a/ITestS3AContractDelete.java | 14 + .../fs/contract/s3a/ITestS3AContractDistCp.java | 7 + .../s3a/ITestS3AContractGetFileStatus.java | 4 + .../fs/contract/s3a/ITestS3AContractMkdir.java | 14 + .../fs/contract/s3a/ITestS3AContractOpen.java | 14 + .../fs/contract/s3a/ITestS3AContractRename.java | 13 + .../contract/s3a/ITestS3AContractRootDir.java | 14 + .../fs/contract/s3a/ITestS3AContractSeek.java | 14 + .../hadoop/fs/s3a/AbstractS3AMockTest.java | 9 +- .../hadoop/fs/s3a/AbstractS3ATestBase.java | 26 +- .../fs/s3a/ITestS3AAWSCredentialsProvider.java | 4 + .../hadoop/fs/s3a/ITestS3AConfiguration.java | 3 +- .../fs/s3a/ITestS3ACopyFromLocalFile.java | 3 +- .../hadoop/fs/s3a/ITestS3ACredentialsInURL.java | 13 +- .../hadoop/fs/s3a/ITestS3ADelayedFNF.java | 62 ++ .../hadoop/fs/s3a/ITestS3AEmptyDirectory.java | 83 ++ .../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java | 319 +++--- .../fs/s3a/ITestS3AFileOperationCost.java | 40 +- .../fs/s3a/ITestS3AFileSystemContract.java | 1 + .../hadoop/fs/s3a/ITestS3AInconsistency.java | 100 ++ .../hadoop/fs/s3a/ITestS3AMiscOperations.java | 27 + .../hadoop/fs/s3a/ITestS3GuardCreate.java | 61 ++ .../hadoop/fs/s3a/ITestS3GuardEmptyDirs.java | 85 ++ .../fs/s3a/ITestS3GuardListConsistency.java | 544 ++++++++++ .../hadoop/fs/s3a/ITestS3GuardWriteBack.java | 141 +++ .../hadoop/fs/s3a/MockS3ClientFactory.java | 3 + .../apache/hadoop/fs/s3a/S3ATestConstants.java | 12 + .../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 197 +++- .../org/apache/hadoop/fs/s3a/TestListing.java | 118 ++ .../ITestS3AFileContextStatistics.java | 4 +- .../s3a/fileContext/ITestS3AFileContextURI.java | 19 +- .../fs/s3a/s3guard/AbstractMSContract.java | 33 + .../s3guard/AbstractS3GuardToolTestBase.java | 161 +++ .../s3a/s3guard/DynamoDBLocalClientFactory.java | 157 +++ .../s3a/s3guard/ITestS3GuardConcurrentOps.java | 160 +++ .../s3a/s3guard/ITestS3GuardToolDynamoDB.java | 134 +++ .../fs/s3a/s3guard/ITestS3GuardToolLocal.java | 149 +++ .../fs/s3a/s3guard/MetadataStoreTestBase.java | 887 +++++++++++++++ .../fs/s3a/s3guard/TestDirListingMetadata.java | 303 ++++++ .../s3a/s3guard/TestDynamoDBMetadataStore.java | 594 ++++++++++ .../fs/s3a/s3guard/TestLocalMetadataStore.java | 140 +++ .../fs/s3a/s3guard/TestNullMetadataStore.java | 58 + .../TestPathMetadataDynamoDBTranslation.java | 238 +++++ .../hadoop/fs/s3a/s3guard/TestS3Guard.java | 93 ++ .../AbstractITestS3AMetadataStoreScale.java | 250 +++++ .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java | 13 +- .../scale/ITestDynamoDBMetadataStoreScale.java | 48 + .../s3a/scale/ITestLocalMetadataStoreScale.java | 37 + .../fs/s3a/scale/ITestS3AConcurrentOps.java | 3 +- .../fs/s3a/scale/ITestS3ACreatePerformance.java | 86 ++ .../s3a/scale/ITestS3ADirectoryPerformance.java | 5 +- .../scale/ITestS3AInputStreamPerformance.java | 4 +- .../hadoop/fs/s3a/scale/S3AScaleTestBase.java | 2 +- .../hadoop-aws/src/test/resources/core-site.xml | 26 + .../src/test/resources/log4j.properties | 15 +- 101 files changed, 13065 insertions(+), 538 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml ---------------------------------------------------------------------- diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml index bc9548b..0a4367d 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml @@ -174,6 +174,19 @@ <directory>../hadoop-sls/target/hadoop-sls-${project.version}/sls</directory> <outputDirectory>/share/hadoop/${hadoop.component}/sls</outputDirectory> </fileSet> + <fileSet> + <directory>../hadoop-aws/src/main/bin</directory> + <outputDirectory>/bin</outputDirectory> + <fileMode>0755</fileMode> + </fileSet> + <fileSet> + <directory>../hadoop-aws/src/main/shellprofile.d</directory> + <includes> + <include>*</include> + </includes> + <outputDirectory>/libexec/shellprofile.d</outputDirectory> + <fileMode>0755</fileMode> + </fileSet> </fileSets> <dependencySets> <dependencySet> http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index e9222bb..1d188ba 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -172,6 +172,11 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <scope>compile</scope> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <scope>compile</scope> http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java index 9bea8f9..df14ee8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java @@ -20,6 +20,7 @@ package org.apache.hadoop.fs; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -132,6 +133,13 @@ public abstract class AbstractFileSystem { CONSTRUCTOR_CACHE.put(theClass, meth); } result = meth.newInstance(uri, conf); + } catch (InvocationTargetException e) { + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new RuntimeException(cause); + } } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index fef968b..21733b3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -331,6 +331,15 @@ public class FileContext { return AbstractFileSystem.get(uri, conf); } }); + } catch (RuntimeException ex) { + // RTEs can wrap other exceptions; if there is an IOException inner, + // throw it direct. + Throwable cause = ex.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw ex; + } } catch (InterruptedException ex) { LOG.error(ex.toString()); throw new IOException("Failed to get the AbstractFileSystem for path: " http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index cb061aa..9e2c553 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1309,12 +1309,120 @@ </property> <property> + <name>fs.s3a.metadatastore.authoritative</name> + <value>false</value> + <description> + When true, allow MetadataStore implementations to act as source of + truth for getting file status and directory listings. Even if this + is set to true, MetadataStore implementations may choose not to + return authoritative results. If the configured MetadataStore does + not support being authoritative, this setting will have no effect. + </description> +</property> + +<property> + <name>fs.s3a.metadatastore.impl</name> + <value>org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore</value> + <description> + Fully-qualified name of the class that implements the MetadataStore + to be used by s3a. The default class, NullMetadataStore, has no + effect: s3a will continue to treat the backing S3 service as the one + and only source of truth for file and directory metadata. + </description> +</property> + +<property> + <name>fs.s3a.s3guard.cli.prune.age</name> + <value>86400000</value> + <description> + Default age (in milliseconds) after which to prune metadata from the + metadatastore when the prune command is run. Can be overridden on the + command-line. + </description> +</property> + + +<property> <name>fs.s3a.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> <description>The implementation class of the S3A Filesystem</description> </property> <property> + <name>fs.s3a.s3guard.ddb.region</name> + <value></value> + <description> + AWS DynamoDB region to connect to. An up-to-date list is + provided in the AWS Documentation: regions and endpoints. Without this + property, the S3Guard will operate table in the associated S3 bucket region. + </description> +</property> + +<property> + <name>fs.s3a.s3guard.ddb.table</name> + <value></value> + <description> + The DynamoDB table name to operate. Without this property, the respective + S3 bucket name will be used. + </description> +</property> + +<property> + <name>fs.s3a.s3guard.ddb.table.create</name> + <value>false</value> + <description> + If true, the S3A client will create the table if it does not already exist. + </description> +</property> + +<property> + <name>fs.s3a.s3guard.ddb.table.capacity.read</name> + <value>500</value> + <description> + Provisioned throughput requirements for read operations in terms of capacity + units for the DynamoDB table. This config value will only be used when + creating a new DynamoDB table, though later you can manually provision by + increasing or decreasing read capacity as needed for existing tables. + See DynamoDB documents for more information. + </description> +</property> + +<property> + <name>fs.s3a.s3guard.ddb.table.capacity.write</name> + <value>100</value> + <description> + Provisioned throughput requirements for write operations in terms of + capacity units for the DynamoDB table. Refer to related config + fs.s3a.s3guard.ddb.table.capacity.read before usage. + </description> +</property> + +<property> + <name>fs.s3a.s3guard.ddb.max.retries</name> + <value>9</value> + <description> + Max retries on batched DynamoDB operations before giving up and + throwing an IOException. Each retry is delayed with an exponential + backoff timer which starts at 100 milliseconds and approximately + doubles each time. The minimum wait before throwing an exception is + sum(100, 200, 400, 800, .. 100*2^N-1 ) == 100 * ((2^N)-1) + So N = 9 yields at least 51.1 seconds (51,100) milliseconds of blocking + before throwing an IOException. + </description> +</property> + +<property> + <name>fs.s3a.s3guard.ddb.background.sleep</name> + <value>25</value> + <description> + Length (in milliseconds) of pause between each batch of deletes when + pruning metadata. Prevents prune operations (which can typically be low + priority background operations) from overly interfering with other I/O + operations. + </description> +</property> + +<property> <name>fs.AbstractFileSystem.s3a.impl</name> <value>org.apache.hadoop.fs.s3a.S3A</value> <description>The implementation class of the S3A AbstractFileSystem.</description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java index 92e2135..9d8cd64 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java @@ -748,13 +748,27 @@ public abstract class FileSystemContractBaseTest { /** * This a sanity check to make sure that any filesystem's handling of - * renames doesn't cause any regressions + * renames empty dirs doesn't cause any regressions. + */ + public void testRenameEmptyToDirWithSamePrefixAllowed() throws Throwable { + assumeTrue(renameSupported()); + Path parentdir = path("testRenameEmptyToDirWithSamePrefixAllowed"); + fs.mkdirs(parentdir); + Path dest = path("testRenameEmptyToDirWithSamePrefixAllowedDest"); + rename(parentdir, dest, true, false, true); + } + + /** + * This a sanity check to make sure that any filesystem's handling of + * renames non-empty dirs doesn't cause any regressions. */ @Test public void testRenameToDirWithSamePrefixAllowed() throws Throwable { assumeTrue(renameSupported()); final Path parentdir = path("testRenameToDirWithSamePrefixAllowed"); fs.mkdirs(parentdir); + // Before renaming, we create one file under the source parent directory + createFile(new Path(parentdir, "mychild")); final Path dest = path("testRenameToDirWithSamePrefixAllowedDest"); rename(parentdir, dest, true, false, true); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java index b0dcb93..b6d0a49 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java @@ -222,4 +222,67 @@ public abstract class AbstractContractRenameTest extends assertPathDoesNotExist("not deleted", new Path(srcDir, "source.txt")); } + + /** + * Test that after renaming, the nested subdirectory is moved along with all + * its ancestors. + */ + @Test + public void testRenamePopulatesDirectoryAncestors() throws IOException { + final FileSystem fs = getFileSystem(); + final Path src = path("testRenamePopulatesDirectoryAncestors/source"); + fs.mkdirs(src); + final String nestedDir = "/dir1/dir2/dir3/dir4"; + fs.mkdirs(path(src + nestedDir)); + + Path dst = path("testRenamePopulatesDirectoryAncestorsNew"); + + fs.rename(src, dst); + validateAncestorsMoved(src, dst, nestedDir); + } + + /** + * Test that after renaming, the nested file is moved along with all its + * ancestors. It is similar to {@link #testRenamePopulatesDirectoryAncestors}. + */ + @Test + public void testRenamePopulatesFileAncestors() throws IOException { + final FileSystem fs = getFileSystem(); + final Path src = path("testRenamePopulatesFileAncestors/source"); + fs.mkdirs(src); + final String nestedFile = "/dir1/dir2/dir3/file4"; + byte[] srcDataset = dataset(256, 'a', 'z'); + writeDataset(fs, path(src + nestedFile), srcDataset, srcDataset.length, + 1024, false); + + Path dst = path("testRenamePopulatesFileAncestorsNew"); + + fs.rename(src, dst); + validateAncestorsMoved(src, dst, nestedFile); + } + + /** + * Validate that the nested path and its ancestors should have been moved. + * + * @param src the source root to move + * @param dst the destination root to move + * @param nestedPath the nested path to move + */ + private void validateAncestorsMoved(Path src, Path dst, String nestedPath) + throws IOException { + assertIsDirectory(dst); + assertPathDoesNotExist("src path should not exist", path(src + nestedPath)); + assertPathExists("dst path should exist", path(dst + nestedPath)); + + Path path = new Path(nestedPath).getParent(); + while (path != null && !path.isRoot()) { + final Path parentSrc = path(src + path.toString()); + assertPathDoesNotExist(parentSrc + " is not deleted", parentSrc); + final Path parentDst = path(dst + path.toString()); + assertPathExists(parentDst + " should exist after rename", parentDst); + assertIsDirectory(parentDst); + path = path.getParent(); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java index 1fa5c3f..00cfa44 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java @@ -249,6 +249,23 @@ public final class LambdaTestUtils { } /** + * Variant of {@link #eventually(int, Callable, Callable)} method for + * void lambda expressions. + * @param timeoutMillis timeout in milliseconds. + * Can be zero, in which case only one attempt is made before failing. + * @param eval expression to evaluate + * @param retry retry interval generator + * @throws Exception the last exception thrown before timeout was triggered + * @throws FailFastException if raised -without any retry attempt. + * @throws InterruptedException if interrupted during the sleep operation. + */ + public static void eventually(int timeoutMillis, + VoidCallable eval, + Callable<Integer> retry) throws Exception { + eventually(timeoutMillis, new VoidCaller(eval), retry); + } + + /** * Simplified {@link #eventually(int, Callable, Callable)} method * with a fixed interval. * <p> @@ -277,6 +294,25 @@ public final class LambdaTestUtils { } /** + /** + * Variant of {@link #eventually(int, int, Callable)} method for + * void lambda expressions. + * @param timeoutMillis timeout in milliseconds. + * Can be zero, in which case only one attempt is made before failing. + * @param intervalMillis interval in milliseconds + * @param eval expression to evaluate + * @throws Exception the last exception thrown before timeout was triggered + * @throws FailFastException if raised -without any retry attempt. + * @throws InterruptedException if interrupted during the sleep operation. + */ + public static void eventually(int timeoutMillis, + int intervalMillis, + VoidCallable eval) throws Exception { + eventually(timeoutMillis, eval, + new FixedRetryInterval(intervalMillis)); + } + + /** * Intercept an exception; throw an {@code AssertionError} if one not raised. * The caught exception is rethrown if it is of the wrong class or * does not contain the text defined in {@code contained}. @@ -319,6 +355,32 @@ public final class LambdaTestUtils { } /** + * Variant of {@link #intercept(Class, Callable)} to simplify void + * invocations. + * @param clazz class of exception; the raised exception must be this class + * <i>or a subclass</i>. + * @param eval expression to eval + * @param <E> exception class + * @return the caught exception if it was of the expected type + * @throws Exception any other exception raised + * @throws AssertionError if the evaluation call didn't raise an exception. + */ + public static <E extends Throwable> E intercept( + Class<E> clazz, + VoidCallable eval) + throws Exception { + try { + eval.call(); + throw new AssertionError("Expected an exception"); + } catch (Throwable e) { + if (clazz.isAssignableFrom(e.getClass())) { + return (E)e; + } + throw e; + } + } + + /** * Intercept an exception; throw an {@code AssertionError} if one not raised. * The caught exception is rethrown if it is of the wrong class or * does not contain the text defined in {@code contained}. @@ -359,6 +421,29 @@ public final class LambdaTestUtils { } /** + * Variant of {@link #intercept(Class, Callable)} to simplify void + * invocations. + * @param clazz class of exception; the raised exception must be this class + * <i>or a subclass</i>. + * @param contained string which must be in the {@code toString()} value + * of the exception + * @param eval expression to eval + * @param <E> exception class + * @return the caught exception if it was of the expected type + * @throws Exception any other exception raised + * @throws AssertionError if the evaluation call didn't raise an exception. + */ + public static <E extends Throwable> E intercept( + Class<E> clazz, + String contained, + VoidCallable eval) + throws Exception { + E ex = intercept(clazz, eval); + GenericTestUtils.assertExceptionContains(contained, ex); + return ex; + } + + /** * Robust string converter for exception messages; if the {@code toString()} * method throws an exception then that exception is caught and logged, * then a simple string of the classname logged. @@ -518,4 +603,31 @@ public final class LambdaTestUtils { return new FailFastException(String.format(format, args)); } } + + /** + * A simple interface for lambdas, which returns nothing; this exists + * to simplify lambda tests on operations with no return value. + */ + public interface VoidCallable { + void call() throws Exception; + } + + /** + * Bridge class to make {@link VoidCallable} something to use in anything + * which takes an {@link Callable}. + */ + public static class VoidCaller implements Callable<Void> { + private final VoidCallable callback; + + public VoidCaller(VoidCallable callback) { + this.callback = callback; + } + + @Override + public Void call() throws Exception { + callback.call(); + return null; + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 938ef05..9a52d76 100755 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -870,6 +870,17 @@ <groupId>org.apache.commons</groupId> <artifactId>commons-configuration2</artifactId> <version>2.1</version> + <exclusions> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.4</version> </dependency> <dependency> <groupId>org.slf4j</groupId> @@ -1734,4 +1745,12 @@ </build> </profile> </profiles> + + <repositories> + <repository> + <id>dynamodb-local-oregon</id> + <name>DynamoDB Local Release Repository</name> + <url>https://s3-us-west-2.amazonaws.com/dynamodb-local/release</url> + </repository> + </repositories> </project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml index ffb0a79..82ec16e 100644 --- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml +++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml @@ -26,4 +26,10 @@ <Match> <Class name="org.apache.hadoop.fs.s3.INode" /> </Match> + <!-- Redundant null check makes code clearer, future-proof here. --> + <Match> + <Class name="org.apache.hadoop.fs.s3a.S3AFileSystem" /> + <Method name="s3Exists" /> + <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" /> + </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 91e94a6..bcb0e07 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -36,6 +36,7 @@ <downloadSources>true</downloadSources> <hadoop.tmp.dir>${project.build.directory}/test</hadoop.tmp.dir> + <dynamodb.local.version>1.11.86</dynamodb.local.version> <!-- are scale tests enabled ? --> <fs.s3a.scale.test.enabled>unset</fs.s3a.scale.test.enabled> <!-- Size in MB of huge files. --> @@ -44,6 +45,11 @@ <fs.s3a.scale.test.huge.partitionsize>unset</fs.s3a.scale.test.huge.partitionsize> <!-- Timeout in seconds for scale tests.--> <fs.s3a.scale.test.timeout>3600</fs.s3a.scale.test.timeout> + <!-- are scale tests enabled ? --> + <fs.s3a.s3guard.test.enabled>false</fs.s3a.s3guard.test.enabled> + <fs.s3a.s3guard.test.authoritative>false</fs.s3a.s3guard.test.authoritative> + <fs.s3a.s3guard.test.implementation>local</fs.s3a.s3guard.test.implementation> + </properties> <profiles> @@ -164,6 +170,11 @@ <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize> <fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize> <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout> + <!-- S3Guard --> + <fs.s3a.s3guard.test.enabled>${fs.s3a.s3guard.test.enabled}</fs.s3a.s3guard.test.enabled> + <fs.s3a.s3guard.test.authoritative>${fs.s3a.s3guard.test.authoritative}</fs.s3a.s3guard.test.authoritative> + <fs.s3a.s3guard.test.implementation>${fs.s3a.s3guard.test.implementation}</fs.s3a.s3guard.test.implementation> + </systemPropertyVariables> <!-- Some tests cannot run in parallel. Tests that cover --> <!-- access to the root directory must run in isolation --> @@ -205,6 +216,10 @@ <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize> <fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize> <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout> + <!-- S3Guard --> + <fs.s3a.s3guard.test.enabled>${fs.s3a.s3guard.test.enabled}</fs.s3a.s3guard.test.enabled> + <fs.s3a.s3guard.test.implementation>${fs.s3a.s3guard.test.implementation}</fs.s3a.s3guard.test.implementation> + <fs.s3a.s3guard.test.authoritative>${fs.s3a.s3guard.test.authoritative}</fs.s3a.s3guard.test.authoritative> </systemPropertyVariables> <!-- Do a sequential run for tests that cannot handle --> <!-- parallel execution. --> @@ -247,6 +262,10 @@ <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled> <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize> <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout> + <!-- S3Guard --> + <fs.s3a.s3guard.test.enabled>${fs.s3a.s3guard.test.enabled}</fs.s3a.s3guard.test.enabled> + <fs.s3a.s3guard.test.implementation>${fs.s3a.s3guard.test.implementation}</fs.s3a.s3guard.test.implementation> + <fs.s3a.s3guard.test.authoritative>${fs.s3a.s3guard.test.authoritative}</fs.s3a.s3guard.test.authoritative> </systemPropertyVariables> <forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds> </configuration> @@ -269,6 +288,60 @@ <fs.s3a.scale.test.enabled>true</fs.s3a.scale.test.enabled> </properties> </profile> + + <!-- Turn on S3Guard tests--> + <profile> + <id>s3guard</id> + <activation> + <property> + <name>s3guard</name> + </property> + </activation> + <properties > + <fs.s3a.s3guard.test.enabled>true</fs.s3a.s3guard.test.enabled> + </properties> + </profile> + + <!-- Switch to DynamoDB for S3Guard. Has no effect unless S3Guard is enabled --> + <profile> + <id>dynamo</id> + <activation> + <property> + <name>dynamo</name> + </property> + </activation> + <properties > + <fs.s3a.s3guard.test.implementation>dynamo</fs.s3a.s3guard.test.implementation> + </properties> + </profile> + + <!-- Switch to DynamoDBLocal for S3Guard. Has no effect unless S3Guard is enabled --> + <profile> + <id>dynamodblocal</id> + <activation> + <property> + <name>dynamodblocal</name> + </property> + </activation> + <properties> + <fs.s3a.s3guard.test.implementation>dynamodblocal</fs.s3a.s3guard.test.implementation> + </properties> + </profile> + + <!-- Switch S3Guard from Authoritative=false to true + Has no effect unless S3Guard is enabled --> + <profile> + <id>non-auth</id> + <activation> + <property> + <name>auth</name> + </property> + </activation> + <properties > + <fs.s3a.s3guard.test.authoritative>true</fs.s3a.s3guard.test.authoritative> + </properties> + </profile> + </profiles> <build> @@ -296,16 +369,48 @@ <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> - <id>deplist</id> + <id>deplist1</id> <phase>compile</phase> <goals> <goal>list</goal> </goals> <configuration> - <!-- build a shellprofile --> + <!-- build a shellprofile for hadoop-aws optional tools --> <outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile> </configuration> </execution> + <execution> + <id>copy</id> + <phase>test-compile</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <includeScope>test</includeScope> + <includeTypes>so,dll,dylib</includeTypes> + <outputDirectory>${project.build.directory}/native-libs</outputDirectory> + </configuration> + </execution> + <execution> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + </configuration> + </execution> + <execution> + <id>deplist2</id> + <phase>compile</phase> + <goals> + <goal>list</goal> + </goals> + <configuration> + <!-- referenced by the s3guard command --> + <outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-builtin.txt</outputFile> + </configuration> + </execution> </executions> </plugin> </plugins> @@ -334,6 +439,26 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>DynamoDBLocal</artifactId> + <version>${dynamodb.local.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-http</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 9e15b3f..1a464d0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -267,6 +267,11 @@ public final class Constants { public static final String USER_AGENT_PREFIX = "fs.s3a.user.agent.prefix"; + /** Whether or not to allow MetadataStore to be source of truth. */ + public static final String METADATASTORE_AUTHORITATIVE = + "fs.s3a.metadatastore.authoritative"; + public static final boolean DEFAULT_METADATASTORE_AUTHORITATIVE = false; + /** read ahead buffer size to prevent connection re-establishments. */ public static final String READAHEAD_RANGE = "fs.s3a.readahead.range"; public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024; @@ -312,7 +317,7 @@ public final class Constants { @InterfaceStability.Unstable public static final Class<? extends S3ClientFactory> DEFAULT_S3_CLIENT_FACTORY_IMPL = - S3ClientFactory.DefaultS3ClientFactory.class; + DefaultS3ClientFactory.class; /** * Maximum number of partitions in a multipart upload: {@value}. @@ -320,4 +325,130 @@ public final class Constants { @InterfaceAudience.Private public static final int MAX_MULTIPART_COUNT = 10000; + /** + * Classname of the S3A-specific output committer factory. This + * is what must be declared when attempting to use + */ + @InterfaceStability.Unstable + public static final String S3A_OUTPUT_COMMITTER_FACTORY = + "org.apache.hadoop.fs.s3a.commit.S3AOutputCommitterFactory"; + + /* Constants. */ + public static final String S3_METADATA_STORE_IMPL = + "fs.s3a.metadatastore.impl"; + + /** Minimum period of time (in milliseconds) to keep metadata (may only be + * applied when a prune command is manually run). + */ + @InterfaceStability.Unstable + public static final String S3GUARD_CLI_PRUNE_AGE = + "fs.s3a.s3guard.cli.prune.age"; + + /** + * The region of the DynamoDB service. + * + * This config has no default value. If the user does not set this, the + * S3Guard will operate table in the associated S3 bucket region. + */ + @InterfaceStability.Unstable + public static final String S3GUARD_DDB_REGION_KEY = + "fs.s3a.s3guard.ddb.region"; + + /** + * The DynamoDB table name to use. + * + * This config has no default value. If the user does not set this, the + * S3Guard implementation will use the respective S3 bucket name. + */ + @InterfaceStability.Unstable + public static final String S3GUARD_DDB_TABLE_NAME_KEY = + "fs.s3a.s3guard.ddb.table"; + + /** + * Whether to create the DynamoDB table if the table does not exist. + */ + @InterfaceStability.Unstable + public static final String S3GUARD_DDB_TABLE_CREATE_KEY = + "fs.s3a.s3guard.ddb.table.create"; + + @InterfaceStability.Unstable + public static final String S3GUARD_DDB_TABLE_CAPACITY_READ_KEY = + "fs.s3a.s3guard.ddb.table.capacity.read"; + public static final long S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT = 500; + @InterfaceStability.Unstable + public static final String S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY = + "fs.s3a.s3guard.ddb.table.capacity.write"; + public static final long S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT = 100; + + /** + * The maximum put or delete requests per BatchWriteItem request. + * + * Refer to Amazon API reference for this limit. + */ + public static final int S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT = 25; + + @InterfaceStability.Unstable + public static final String S3GUARD_DDB_MAX_RETRIES = + "fs.s3a.s3guard.ddb.max.retries"; + /** + * Max retries on batched DynamoDB operations before giving up and + * throwing an IOException. Default is {@value}. See core-default.xml for + * more detail. + */ + public static final int S3GUARD_DDB_MAX_RETRIES_DEFAULT = 9; + + /** + * Period of time (in milliseconds) to sleep between batches of writes. + * Currently only applies to prune operations, as they are naturally a + * lower priority than other operations. + */ + @InterfaceStability.Unstable + public static final String S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY = + "fs.s3a.s3guard.ddb.background.sleep"; + public static final int S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT = 25; + + /** + * V1 committer. + */ + @InterfaceStability.Unstable + public static final String S3A_OUTPUT_COMMITTER_MRV1 = + "org.apache.hadoop.fs.s3a.commit.S3OutputCommitterMRv1"; + + /** + * The default "Null" metadata store: {@value}. + */ + @InterfaceStability.Unstable + public static final String S3GUARD_METASTORE_NULL + = "org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore"; + + /** + * Use Local memory for the metadata: {@value}. + * This is not coherent across processes and must be used for testing only. + */ + @InterfaceStability.Unstable + public static final String S3GUARD_METASTORE_LOCAL + = "org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore"; + + /** + * Use DynamoDB for the metadata: {@value}. + */ + @InterfaceStability.Unstable + public static final String S3GUARD_METASTORE_DYNAMO + = "org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore"; + + /** + * Inconsistency (visibility delay) injection settings. + */ + @InterfaceStability.Unstable + public static final String FAIL_INJECT_INCONSISTENCY_KEY = + "fs.s3a.failinject.inconsistency.key.substring"; + + @InterfaceStability.Unstable + public static final String FAIL_INJECT_INCONSISTENCY_MSEC = + "fs.s3a.failinject.inconsistency.msec"; + + @InterfaceStability.Unstable + public static final String FAIL_INJECT_INCONSISTENCY_PROBABILITY = + "fs.s3a.failinject.inconsistency.probability"; + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java new file mode 100644 index 0000000..f33b25e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.VersionInfo; +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.URI; + +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet; +import static org.apache.hadoop.fs.s3a.S3AUtils.intOption; + +/** + * The default factory implementation, which calls the AWS SDK to configure + * and create an {@link AmazonS3Client} that communicates with the S3 service. + */ +public class DefaultS3ClientFactory extends Configured implements + S3ClientFactory { + + protected static final Logger LOG = S3AFileSystem.LOG; + + @Override + public AmazonS3 createS3Client(URI name) throws IOException { + Configuration conf = getConf(); + AWSCredentialsProvider credentials = + createAWSCredentialProviderSet(name, conf); + final ClientConfiguration awsConf = createAwsConf(getConf()); + AmazonS3 s3 = newAmazonS3Client(credentials, awsConf); + return createAmazonS3Client(s3, conf, credentials, awsConf); + } + + /** + * Create a new {@link ClientConfiguration}. + * @param conf The Hadoop configuration + * @return new AWS client configuration + */ + public static ClientConfiguration createAwsConf(Configuration conf) { + final ClientConfiguration awsConf = new ClientConfiguration(); + initConnectionSettings(conf, awsConf); + initProxySupport(conf, awsConf); + initUserAgent(conf, awsConf); + return awsConf; + } + + /** + * Wrapper around constructor for {@link AmazonS3} client. Override this to + * provide an extended version of the client + * @param credentials credentials to use + * @param awsConf AWS configuration + * @return new AmazonS3 client + */ + protected AmazonS3 newAmazonS3Client( + AWSCredentialsProvider credentials, ClientConfiguration awsConf) { + return new AmazonS3Client(credentials, awsConf); + } + + /** + * Initializes all AWS SDK settings related to connection management. + * + * @param conf Hadoop configuration + * @param awsConf AWS SDK configuration + */ + private static void initConnectionSettings(Configuration conf, + ClientConfiguration awsConf) { + awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, + DEFAULT_MAXIMUM_CONNECTIONS, 1)); + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, + DEFAULT_SECURE_CONNECTIONS); + awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); + awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, + DEFAULT_MAX_ERROR_RETRIES, 0)); + awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, + DEFAULT_ESTABLISH_TIMEOUT, 0)); + awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, + DEFAULT_SOCKET_TIMEOUT, 0)); + int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER, + DEFAULT_SOCKET_SEND_BUFFER, 2048); + int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER, + DEFAULT_SOCKET_RECV_BUFFER, 2048); + awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer); + String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); + if (!signerOverride.isEmpty()) { + LOG.debug("Signer override = {}", signerOverride); + awsConf.setSignerOverride(signerOverride); + } + } + + /** + * Initializes AWS SDK proxy support if configured. + * + * @param conf Hadoop configuration + * @param awsConf AWS SDK configuration + * @throws IllegalArgumentException if misconfigured + */ + private static void initProxySupport(Configuration conf, + ClientConfiguration awsConf) throws IllegalArgumentException { + String proxyHost = conf.getTrimmed(PROXY_HOST, ""); + int proxyPort = conf.getInt(PROXY_PORT, -1); + if (!proxyHost.isEmpty()) { + awsConf.setProxyHost(proxyHost); + if (proxyPort >= 0) { + awsConf.setProxyPort(proxyPort); + } else { + if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) { + LOG.warn("Proxy host set without port. Using HTTPS default 443"); + awsConf.setProxyPort(443); + } else { + LOG.warn("Proxy host set without port. Using HTTP default 80"); + awsConf.setProxyPort(80); + } + } + String proxyUsername = conf.getTrimmed(PROXY_USERNAME); + String proxyPassword = conf.getTrimmed(PROXY_PASSWORD); + if ((proxyUsername == null) != (proxyPassword == null)) { + String msg = "Proxy error: " + PROXY_USERNAME + " or " + + PROXY_PASSWORD + " set without the other."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + awsConf.setProxyUsername(proxyUsername); + awsConf.setProxyPassword(proxyPassword); + awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN)); + awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION)); + if (LOG.isDebugEnabled()) { + LOG.debug("Using proxy server {}:{} as user {} with password {} on " + + "domain {} as workstation {}", awsConf.getProxyHost(), + awsConf.getProxyPort(), + String.valueOf(awsConf.getProxyUsername()), + awsConf.getProxyPassword(), awsConf.getProxyDomain(), + awsConf.getProxyWorkstation()); + } + } else if (proxyPort >= 0) { + String msg = + "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + } + + /** + * Initializes the User-Agent header to send in HTTP requests to the S3 + * back-end. We always include the Hadoop version number. The user also + * may set an optional custom prefix to put in front of the Hadoop version + * number. The AWS SDK interally appends its own information, which seems + * to include the AWS SDK version, OS and JVM version. + * + * @param conf Hadoop configuration + * @param awsConf AWS SDK configuration + */ + private static void initUserAgent(Configuration conf, + ClientConfiguration awsConf) { + String userAgent = "Hadoop " + VersionInfo.getVersion(); + String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, ""); + if (!userAgentPrefix.isEmpty()) { + userAgent = userAgentPrefix + ", " + userAgent; + } + LOG.debug("Using User-Agent: {}", userAgent); + awsConf.setUserAgentPrefix(userAgent); + } + + /** + * Creates an {@link AmazonS3Client} from the established configuration. + * + * @param conf Hadoop configuration + * @param credentials AWS credentials + * @param awsConf AWS SDK configuration + * @return S3 client + * @throws IllegalArgumentException if misconfigured + */ + private static AmazonS3 createAmazonS3Client(AmazonS3 s3, Configuration conf, + AWSCredentialsProvider credentials, ClientConfiguration awsConf) + throws IllegalArgumentException { + String endPoint = conf.getTrimmed(ENDPOINT, ""); + if (!endPoint.isEmpty()) { + try { + s3.setEndpoint(endPoint); + } catch (IllegalArgumentException e) { + String msg = "Incorrect endpoint: " + e.getMessage(); + LOG.error(msg); + throw new IllegalArgumentException(msg, e); + } + } + enablePathStyleAccessIfRequired(s3, conf); + return s3; + } + + /** + * Enables path-style access to S3 buckets if configured. By default, the + * behavior is to use virtual hosted-style access with URIs of the form + * http://bucketname.s3.amazonaws.com. Enabling path-style access and a + * region-specific endpoint switches the behavior to use URIs of the form + * http://s3-eu-west-1.amazonaws.com/bucketname. + * + * @param s3 S3 client + * @param conf Hadoop configuration + */ + private static void enablePathStyleAccessIfRequired(AmazonS3 s3, + Configuration conf) { + final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false); + if (pathStyleAccess) { + LOG.debug("Enabling path style access!"); + s3.setS3ClientOptions(S3ClientOptions.builder() + .setPathStyleAccess(true) + .build()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java new file mode 100644 index 0000000..5e9cb3f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.DeleteObjectRequest; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.DeleteObjectsResult; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.fs.s3a.Constants.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +/** + * A wrapper around {@link com.amazonaws.services.s3.AmazonS3} that injects + * inconsistency and/or errors. Used for testing S3Guard. + * Currently only delays listing visibility, not affecting GET. + */ [email protected] [email protected] +public class InconsistentAmazonS3Client extends AmazonS3Client { + + /** + * Keys containing this substring will be subject to delayed visibility. + */ + public static final String DEFAULT_DELAY_KEY_SUBSTRING = "DELAY_LISTING_ME"; + + /** + * How many seconds affected keys will be delayed from appearing in listing. + * This should probably be a config value. + */ + public static final long DEFAULT_DELAY_KEY_MSEC = 5 * 1000; + + public static final float DEFAULT_DELAY_KEY_PROBABILITY = 1.0f; + + /** Special config value since we can't store empty strings in XML. */ + public static final String MATCH_ALL_KEYS = "*"; + + private static final Logger LOG = + LoggerFactory.getLogger(InconsistentAmazonS3Client.class); + + /** Empty string matches all keys. */ + private String delayKeySubstring; + + /** Probability to delay visibility of a matching key. */ + private float delayKeyProbability; + + /** Time in milliseconds to delay visibility of newly modified object. */ + private long delayKeyMsec; + + /** + * Composite of data we need to track about recently deleted objects: + * when it was deleted (same was with recently put objects) and the object + * summary (since we should keep returning it for sometime after its + * deletion). + */ + private static class Delete { + private Long time; + private S3ObjectSummary summary; + + Delete(Long time, S3ObjectSummary summary) { + this.time = time; + this.summary = summary; + } + + public Long time() { + return time; + } + + public S3ObjectSummary summary() { + return summary; + } + } + + /** Map of key to delay -> time it was deleted + object summary (object + * summary is null for prefixes. */ + private Map<String, Delete> delayedDeletes = new HashMap<>(); + + /** Map of key to delay -> time it was created. */ + private Map<String, Long> delayedPutKeys = new HashMap<>(); + + public InconsistentAmazonS3Client(AWSCredentialsProvider credentials, + ClientConfiguration clientConfiguration, Configuration conf) { + super(credentials, clientConfiguration); + setupConfig(conf); + } + + protected void setupConfig(Configuration conf) { + + delayKeySubstring = conf.get(FAIL_INJECT_INCONSISTENCY_KEY, + DEFAULT_DELAY_KEY_SUBSTRING); + // "" is a substring of all strings, use it to match all keys. + if (delayKeySubstring.equals(MATCH_ALL_KEYS)) { + delayKeySubstring = ""; + } + delayKeyProbability = conf.getFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, + DEFAULT_DELAY_KEY_PROBABILITY); + delayKeyMsec = conf.getLong(FAIL_INJECT_INCONSISTENCY_MSEC, + DEFAULT_DELAY_KEY_MSEC); + LOG.info("Enabled with {} msec delay, substring {}, probability {}", + delayKeyMsec, delayKeySubstring, delayKeyProbability); + } + + /** + * Clear all oustanding inconsistent keys. After calling this function, + * listings should behave normally (no failure injection), until additional + * keys are matched for delay, e.g. via putObject(), deleteObject(). + */ + public void clearInconsistency() { + LOG.info("clearing all delayed puts / deletes"); + delayedDeletes.clear(); + delayedPutKeys.clear(); + } + + /** + * Convenience function for test code to cast from supertype. + * @param c supertype to cast from + * @return subtype, not null + * @throws Exception on error + */ + public static InconsistentAmazonS3Client castFrom(AmazonS3 c) throws + Exception { + InconsistentAmazonS3Client ic = null; + if (c instanceof InconsistentAmazonS3Client) { + ic = (InconsistentAmazonS3Client) c; + } + Preconditions.checkNotNull(ic, "Not an instance of " + + "InconsistentAmazonS3Client"); + return ic; + } + + @Override + public DeleteObjectsResult deleteObjects(DeleteObjectsRequest + deleteObjectsRequest) + throws AmazonClientException, AmazonServiceException { + for (DeleteObjectsRequest.KeyVersion keyVersion : + deleteObjectsRequest.getKeys()) { + registerDeleteObject(keyVersion.getKey(), deleteObjectsRequest + .getBucketName()); + } + return super.deleteObjects(deleteObjectsRequest); + } + + @Override + public void deleteObject(DeleteObjectRequest deleteObjectRequest) + throws AmazonClientException, AmazonServiceException { + String key = deleteObjectRequest.getKey(); + LOG.debug("key {}", key); + registerDeleteObject(key, deleteObjectRequest.getBucketName()); + super.deleteObject(deleteObjectRequest); + } + + /* We should only need to override this version of putObject() */ + @Override + public PutObjectResult putObject(PutObjectRequest putObjectRequest) + throws AmazonClientException, AmazonServiceException { + LOG.debug("key {}", putObjectRequest.getKey()); + registerPutObject(putObjectRequest); + return super.putObject(putObjectRequest); + } + + /* We should only need to override this version of listObjects() */ + @Override + public ObjectListing listObjects(ListObjectsRequest listObjectsRequest) + throws AmazonClientException, AmazonServiceException { + LOG.debug("prefix {}", listObjectsRequest.getPrefix()); + ObjectListing listing = super.listObjects(listObjectsRequest); + listing = filterListObjects(listObjectsRequest, listing); + listing = restoreListObjects(listObjectsRequest, listing); + return listing; + } + + private void addSummaryIfNotPresent(List<S3ObjectSummary> list, + S3ObjectSummary item) { + // Behavior of S3ObjectSummary + String key = item.getKey(); + for (S3ObjectSummary member : list) { + if (member.getKey().equals(key)) { + return; + } + } + list.add(item); + } + + /** + * Add prefix of child to given list. The added prefix will be equal to + * ancestor plus one directory past ancestor. e.g.: + * if ancestor is "/a/b/c" and child is "/a/b/c/d/e/file" then "a/b/c/d" is + * added to list. + * @param prefixes list to add to + * @param ancestor path we are listing in + * @param child full path to get prefix from + */ + private void addPrefixIfNotPresent(List<String> prefixes, String ancestor, + String child) { + Path prefixCandidate = new Path(child).getParent(); + Path ancestorPath = new Path(ancestor); + Preconditions.checkArgument(child.startsWith(ancestor), "%s does not " + + "start with %s", child, ancestor); + while (!prefixCandidate.isRoot()) { + Path nextParent = prefixCandidate.getParent(); + if (nextParent.equals(ancestorPath)) { + String prefix = prefixCandidate.toString(); + if (!prefixes.contains(prefix)) { + prefixes.add(prefix); + } + return; + } + prefixCandidate = nextParent; + } + } + + /** + * Checks that the parent key is an ancestor of the child key. + * @param parent key that may be the parent. + * @param child key that may be the child. + * @param recursive if false, only return true for direct children. If + * true, any descendant will count. + * @return true if parent is an ancestor of child + */ + private boolean isDescendant(String parent, String child, boolean recursive) { + if (recursive) { + if (!parent.endsWith("/")) { + parent = parent + "/"; + } + return child.startsWith(parent); + } else { + Path actualParentPath = new Path(child).getParent(); + Path expectedParentPath = new Path(parent); + return actualParentPath.equals(expectedParentPath); + } + } + + /** + * Simulate eventual consistency of delete for this list operation: Any + * recently-deleted keys will be added. + * @param request List request + * @param rawListing listing returned from underlying S3 + * @return listing with recently-deleted items restored + */ + private ObjectListing restoreListObjects(ListObjectsRequest request, + ObjectListing rawListing) { + List<S3ObjectSummary> outputList = rawListing.getObjectSummaries(); + List<String> outputPrefixes = rawListing.getCommonPrefixes(); + // recursive list has no delimiter, returns everything that matches a + // prefix. + boolean recursiveObjectList = !("/".equals(request.getDelimiter())); + + // Go through all deleted keys + for (String key : new HashSet<>(delayedDeletes.keySet())) { + Delete delete = delayedDeletes.get(key); + if (isKeyDelayed(delete.time(), key)) { + if (isDescendant(request.getPrefix(), key, recursiveObjectList)) { + if (delete.summary() != null) { + addSummaryIfNotPresent(outputList, delete.summary()); + } + } + // Non-recursive list has delimiter: will return rolled-up prefixes for + // all keys that are not direct children + if (!recursiveObjectList) { + if (isDescendant(request.getPrefix(), key, true)) { + addPrefixIfNotPresent(outputPrefixes, request.getPrefix(), key); + } + } + } else { + // Clean up any expired entries + delayedDeletes.remove(key); + } + } + + return new CustomObjectListing(rawListing, outputList, outputPrefixes); + } + + private ObjectListing filterListObjects(ListObjectsRequest request, + ObjectListing rawListing) { + + // Filter object listing + List<S3ObjectSummary> outputList = new ArrayList<>(); + for (S3ObjectSummary s : rawListing.getObjectSummaries()) { + String key = s.getKey(); + if (!isKeyDelayed(delayedPutKeys.get(key), key)) { + outputList.add(s); + } + } + + // Filter prefixes (directories) + List<String> outputPrefixes = new ArrayList<>(); + for (String key : rawListing.getCommonPrefixes()) { + if (!isKeyDelayed(delayedPutKeys.get(key), key)) { + outputPrefixes.add(key); + } + } + + return new CustomObjectListing(rawListing, outputList, outputPrefixes); + } + + private boolean isKeyDelayed(Long enqueueTime, String key) { + if (enqueueTime == null) { + LOG.debug("no delay for key {}", key); + return false; + } + long currentTime = System.currentTimeMillis(); + long deadline = enqueueTime + delayKeyMsec; + if (currentTime >= deadline) { + delayedDeletes.remove(key); + LOG.debug("no longer delaying {}", key); + return false; + } else { + LOG.info("delaying {}", key); + return true; + } + } + + private void registerDeleteObject(String key, String bucket) { + if (shouldDelay(key)) { + // Record summary so we can add it back for some time post-deletion + S3ObjectSummary summary = null; + ObjectListing list = listObjects(bucket, key); + for (S3ObjectSummary result : list.getObjectSummaries()) { + if (result.getKey().equals(key)) { + summary = result; + break; + } + } + delayedDeletes.put(key, new Delete(System.currentTimeMillis(), summary)); + } + } + + private void registerPutObject(PutObjectRequest req) { + String key = req.getKey(); + if (shouldDelay(key)) { + enqueueDelayedPut(key); + } + } + + /** + * Should we delay listing visibility for this key? + * @param key key which is being put + * @return true if we should delay + */ + private boolean shouldDelay(String key) { + boolean delay = key.contains(delayKeySubstring); + delay = delay && trueWithProbability(delayKeyProbability); + LOG.debug("{} -> {}", key, delay); + return delay; + } + + + private boolean trueWithProbability(float p) { + return Math.random() < p; + } + + /** + * Record this key as something that should not become visible in + * listObject replies for a while, to simulate eventual list consistency. + * @param key key to delay visibility of + */ + private void enqueueDelayedPut(String key) { + LOG.debug("delaying put of {}", key); + delayedPutKeys.put(key, System.currentTimeMillis()); + } + + /** Since ObjectListing is immutable, we just override it with wrapper. */ + private static class CustomObjectListing extends ObjectListing { + + private final List<S3ObjectSummary> customListing; + private final List<String> customPrefixes; + + CustomObjectListing(ObjectListing rawListing, + List<S3ObjectSummary> customListing, + List<String> customPrefixes) { + super(); + this.customListing = customListing; + this.customPrefixes = customPrefixes; + + this.setBucketName(rawListing.getBucketName()); + this.setCommonPrefixes(rawListing.getCommonPrefixes()); + this.setDelimiter(rawListing.getDelimiter()); + this.setEncodingType(rawListing.getEncodingType()); + this.setMarker(rawListing.getMarker()); + this.setMaxKeys(rawListing.getMaxKeys()); + this.setNextMarker(rawListing.getNextMarker()); + this.setPrefix(rawListing.getPrefix()); + this.setTruncated(rawListing.isTruncated()); + } + + @Override + public List<S3ObjectSummary> getObjectSummaries() { + return customListing; + } + + @Override + public List<String> getCommonPrefixes() { + return customPrefixes; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java new file mode 100644 index 0000000..17d268b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * S3 Client factory used for testing with eventual consistency fault injection. + */ [email protected] [email protected] +public class InconsistentS3ClientFactory extends DefaultS3ClientFactory { + + @Override + protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials, + ClientConfiguration awsConf) { + LOG.warn("** FAILURE INJECTION ENABLED. Do not run in production! **"); + return new InconsistentAmazonS3Client(credentials, awsConf, getConf()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index 30d8e6f..8efa218 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -22,18 +22,25 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; + +import com.google.common.base.Preconditions; import org.slf4j.Logger; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.NoSuchElementException; +import java.util.Set; import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX; import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus; @@ -54,6 +61,43 @@ public class Listing { } /** + * Create a FileStatus iterator against a provided list of file status, with + * a given status filter. + * + * @param fileStatuses the provided list of file status. NO remote calls. + * @param filter file path filter on which paths to accept + * @param acceptor the file status acceptor + * @return the file status iterator + */ + ProvidedFileStatusIterator createProvidedFileStatusIterator( + FileStatus[] fileStatuses, + PathFilter filter, + FileStatusAcceptor acceptor) { + return new ProvidedFileStatusIterator(fileStatuses, filter, acceptor); + } + + /** + * Create a FileStatus iterator against a path, with a given list object + * request. + * + * @param listPath path of the listing + * @param request initial request to make + * @param filter the filter on which paths to accept + * @param acceptor the class/predicate to decide which entries to accept + * in the listing based on the full file status. + * @return the iterator + * @throws IOException IO Problems + */ + FileStatusListingIterator createFileStatusListingIterator( + Path listPath, + ListObjectsRequest request, + PathFilter filter, + Listing.FileStatusAcceptor acceptor) throws IOException { + return createFileStatusListingIterator(listPath, request, filter, acceptor, + null); + } + + /** * Create a FileStatus iterator against a path, with a given * list object request. * @param listPath path of the listing @@ -61,6 +105,8 @@ public class Listing { * @param filter the filter on which paths to accept * @param acceptor the class/predicate to decide which entries to accept * in the listing based on the full file status. + * @param providedStatus the provided list of file status, which may contain + * items that are not listed from source. * @return the iterator * @throws IOException IO Problems */ @@ -68,11 +114,13 @@ public class Listing { Path listPath, ListObjectsRequest request, PathFilter filter, - Listing.FileStatusAcceptor acceptor) throws IOException { + Listing.FileStatusAcceptor acceptor, + RemoteIterator<FileStatus> providedStatus) throws IOException { return new FileStatusListingIterator( new ObjectListingIterator(listPath, request), filter, - acceptor); + acceptor, + providedStatus); } /** @@ -80,12 +128,27 @@ public class Listing { * @param statusIterator an iterator over the remote status entries * @return a new remote iterator */ + @VisibleForTesting LocatedFileStatusIterator createLocatedFileStatusIterator( RemoteIterator<FileStatus> statusIterator) { return new LocatedFileStatusIterator(statusIterator); } /** + * Create an located status iterator that wraps another to filter out a set + * of recently deleted items. + * @param iterator an iterator over the remote located status entries. + * @param tombstones set of paths that are recently deleted and should be + * filtered. + * @return a new remote iterator. + */ + @VisibleForTesting + TombstoneReconcilingIterator createTombstoneReconcilingIterator( + RemoteIterator<LocatedFileStatus> iterator, Set<Path> tombstones) { + return new TombstoneReconcilingIterator(iterator, tombstones); + } + + /** * Interface to implement by the logic deciding whether to accept a summary * entry or path as a valid file or directory. */ @@ -108,6 +171,13 @@ public class Listing { * should be generated.) */ boolean accept(Path keyPath, String commonPrefix); + + /** + * Predicate to decide whether or not to accept a file status. + * @param status file status containing file path information + * @return true if the status is accepted else false + */ + boolean accept(FileStatus status); } /** @@ -115,9 +185,9 @@ public class Listing { * value. * * If the status value is null, the iterator declares that it has no data. - * This iterator is used to handle {@link listStatus()} calls where the path - * handed in refers to a file, not a directory: this is the iterator - * returned. + * This iterator is used to handle {@link S3AFileSystem#listStatus} calls + * where the path handed in refers to a file, not a directory: this is the + * iterator returned. */ static final class SingleStatusRemoteIterator implements RemoteIterator<LocatedFileStatus> { @@ -169,6 +239,47 @@ public class Listing { } /** + * This wraps up a provided non-null list of file status as a remote iterator. + * + * It firstly filters the provided list and later {@link #next} call will get + * from the filtered list. This suffers from scalability issues if the + * provided list is too large. + * + * There is no remote data to fetch. + */ + static class ProvidedFileStatusIterator + implements RemoteIterator<FileStatus> { + private final ArrayList<FileStatus> filteredStatusList; + private int index = 0; + + ProvidedFileStatusIterator(FileStatus[] fileStatuses, PathFilter filter, + FileStatusAcceptor acceptor) { + Preconditions.checkArgument(fileStatuses != null, "Null status list!"); + + filteredStatusList = new ArrayList<>(fileStatuses.length); + for (FileStatus status : fileStatuses) { + if (filter.accept(status.getPath()) && acceptor.accept(status)) { + filteredStatusList.add(status); + } + } + filteredStatusList.trimToSize(); + } + + @Override + public boolean hasNext() throws IOException { + return index < filteredStatusList.size(); + } + + @Override + public FileStatus next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return filteredStatusList.get(index++); + } + } + + /** * Wraps up object listing into a remote iterator which will ask for more * listing data if needed. * @@ -179,7 +290,7 @@ public class Listing { * iterator can declare that there is more data available. * * The need to filter the results precludes the iterator from simply - * declaring that if the {@link S3AFileSystem.ObjectListingIterator#hasNext()} + * declaring that if the {@link ObjectListingIterator#hasNext()} * is true then there are more results. Instead the next batch of results must * be retrieved and filtered. * @@ -208,20 +319,33 @@ public class Listing { /** Iterator over the current set of results. */ private ListIterator<FileStatus> statusBatchIterator; + private final Set<FileStatus> providedStatus; + private Iterator<FileStatus> providedStatusIterator; + /** * Create an iterator over file status entries. * @param source the listing iterator from a listObjects call. * @param filter the filter on which paths to accept * @param acceptor the class/predicate to decide which entries to accept * in the listing based on the full file status. + * @param providedStatus the provided list of file status, which may contain + * items that are not listed from source. * @throws IOException IO Problems */ FileStatusListingIterator(ObjectListingIterator source, PathFilter filter, - FileStatusAcceptor acceptor) throws IOException { + FileStatusAcceptor acceptor, + RemoteIterator<FileStatus> providedStatus) throws IOException { this.source = source; this.filter = filter; this.acceptor = acceptor; + this.providedStatus = new HashSet<>(); + for (; providedStatus != null && providedStatus.hasNext();) { + final FileStatus status = providedStatus.next(); + if (filter.accept(status.getPath()) && acceptor.accept(status)) { + this.providedStatus.add(status); + } + } // build the first set of results. This will not trigger any // remote IO, assuming the source iterator is in its initial // iteration @@ -233,26 +357,53 @@ public class Listing { * If there is data in the local filtered list, return true. * Else: request more data util that condition is met, or there * is no more remote listing data. + * Lastly, return true if the {@code providedStatusIterator} + * has left items. * @return true if a call to {@link #next()} will succeed. * @throws IOException */ @Override public boolean hasNext() throws IOException { - return statusBatchIterator.hasNext() || requestNextBatch(); + return sourceHasNext() || providedStatusIterator.hasNext(); + } + + private boolean sourceHasNext() throws IOException { + if (statusBatchIterator.hasNext() || requestNextBatch()) { + return true; + } else { + // turn to file status that are only in provided list + if (providedStatusIterator == null) { + LOG.debug("Start iterating the provided status."); + providedStatusIterator = providedStatus.iterator(); + } + return false; + } } @Override public FileStatus next() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException(); + final FileStatus status; + if (sourceHasNext()) { + status = statusBatchIterator.next(); + // We remove from provided list the file status listed by S3 so that + // this does not return duplicate items. + LOG.debug("Removing the status from provided file status {}", status); + providedStatus.remove(status); + } else { + if (providedStatusIterator.hasNext()) { + status = providedStatusIterator.next(); + LOG.debug("Returning provided file status {}", status); + } else { + throw new NoSuchElementException(); + } } - return statusBatchIterator.next(); + return status; } /** * Try to retrieve another batch. * Note that for the initial batch, - * {@link S3AFileSystem.ObjectListingIterator} does not generate a request; + * {@link ObjectListingIterator} does not generate a request; * it simply returns the initial set. * * @return true if a new batch was created. @@ -312,7 +463,7 @@ public class Listing { for (String prefix : objects.getCommonPrefixes()) { Path keyPath = owner.keyToQualifiedPath(prefix); if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) { - FileStatus status = new S3AFileStatus(false, keyPath, + FileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath, owner.getUsername()); LOG.debug("Adding directory: {}", status); added++; @@ -352,7 +503,7 @@ public class Listing { * instance. * * 2. Second and later invocations will continue the ongoing listing, - * calling {@link #continueListObjects(ObjectListing)} to request the next + * calling {@link S3AFileSystem#continueListObjects} to request the next * batch of results. * * 3. The {@link #hasNext()} predicate returns true for the initial call, @@ -504,6 +655,11 @@ public class Listing { public boolean accept(Path keyPath, String prefix) { return false; } + + @Override + public boolean accept(FileStatus status) { + return (status != null) && status.isFile(); + } } /** @@ -534,6 +690,80 @@ public class Listing { } /** + * Wraps another iterator and filters out files that appear in the provided + * set of tombstones. Will read ahead in the iterator when necessary to + * ensure that emptiness is detected early enough if only deleted objects + * remain in the source iterator. + */ + static class TombstoneReconcilingIterator implements + RemoteIterator<LocatedFileStatus> { + private LocatedFileStatus next = null; + private final RemoteIterator<LocatedFileStatus> iterator; + private final Set<Path> tombstones; + + /** + * @param iterator Source iterator to filter + * @param tombstones set of tombstone markers to filter out of results + */ + TombstoneReconcilingIterator(RemoteIterator<LocatedFileStatus> + iterator, Set<Path> tombstones) { + this.iterator = iterator; + if (tombstones != null) { + this.tombstones = tombstones; + } else { + this.tombstones = Collections.EMPTY_SET; + } + } + + private boolean fetch() throws IOException { + while (next == null && iterator.hasNext()) { + LocatedFileStatus candidate = iterator.next(); + if (!tombstones.contains(candidate.getPath())) { + next = candidate; + return true; + } + } + return false; + } + + public boolean hasNext() throws IOException { + if (next != null) { + return true; + } + return fetch(); + } + + public LocatedFileStatus next() throws IOException { + if (hasNext()) { + LocatedFileStatus result = next; + next = null; + fetch(); + return result; + } + throw new NoSuchElementException(); + } + } + + /** + * Accept all entries except those which map to S3N pseudo directory markers. + */ + static class AcceptAllButS3nDirs implements FileStatusAcceptor { + + public boolean accept(Path keyPath, S3ObjectSummary summary) { + return !summary.getKey().endsWith(S3N_FOLDER_SUFFIX); + } + + public boolean accept(Path keyPath, String prefix) { + return !keyPath.toString().endsWith(S3N_FOLDER_SUFFIX); + } + + public boolean accept(FileStatus status) { + return !status.getPath().toString().endsWith(S3N_FOLDER_SUFFIX); + } + + } + + /** * Accept all entries except the base path and those which map to S3N * pseudo directory markers. */ @@ -575,6 +805,11 @@ public class Listing { public boolean accept(Path keyPath, String prefix) { return !keyPath.equals(qualifiedPath); } + + @Override + public boolean accept(FileStatus status) { + return (status != null) && !status.getPath().equals(qualifiedPath); + } } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
