HADOOP-14964. AliyunOSS: backport Aliyun OSS module to branch-2. Contributed by Sammi Chen.
The consolidated commits in this backport are as follows: HADOOP-14787. AliyunOSS: Implement the `createNonRecursive` operator. HADOOP-14649. Update aliyun-sdk-oss version to 2.8.1. (Genmao Yu via rchiang) HADOOP-14194. Aliyun OSS should not use empty endpoint as default. Contributed by Genmao Yu HADOOP-14466. Remove useless document from TestAliyunOSSFileSystemContract.java. Contributed by Chen Liang. HADOOP-14458. Add missing imports to TestAliyunOSSFileSystemContract.java. Contributed by Mingliang Liu. HADOOP-14192. AliyunOSS FileSystem contract test should implement getTestBaseDir(). Contributed by Mingliang Liu HADOOP-14072. AliyunOSS: Failed to read from stream when seek beyond the download size. Contributed by Genmao Yu HADOOP-13769. AliyunOSS: update oss sdk version. Contributed by Genmao Yu HADOOP-14069. AliyunOSS: listStatus returns wrong file info. Contributed by Fei Hui HADOOP-13768. AliyunOSS: handle the failure in the batch delete operation `deleteDirs`. Contributed by Genmao Yu HADOOP-14065. AliyunOSS: oss directory filestatus should use meta time. Contributed by Fei Hui HADOOP-14045. Aliyun OSS documentation missing from website. Contributed by Yiqun Lin. HADOOP-13723. AliyunOSSInputStream#read() should update read bytes stat correctly. Contributed by Mingliang Liu HADOOP-13624. Rename TestAliyunOSSContractDispCp. Contributed by Genmao Yu HADOOP-13591. Unit test failure in TestOSSContractGetFileStatus and TestOSSContractRootDir. Contributed by Genmao Yu HADOOP-13481. User documents for Aliyun OSS FileSystem. Contributed by Genmao Yu. HADOOP-12756. Incorporate Aliyun OSS file system implementation. Contributed by Mingfei Shi and Lin Zhou (cherry picked from commit 30ab9b6aef2e3d31f2a8fc9211b5324b3d42f18e) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b756beb6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b756beb6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b756beb6 Branch: refs/heads/branch-2.9 Commit: b756beb6793f1d283703749c1fab92a42325ef6e Parents: 12901cd Author: Sammi Chen <sammi.c...@intel.com> Authored: Mon Nov 27 11:35:17 2017 +0800 Committer: Sammi Chen <sammi.c...@intel.com> Committed: Mon Nov 27 11:35:17 2017 +0800 ---------------------------------------------------------------------- hadoop-project/pom.xml | 22 +- .../dev-support/findbugs-exclude.xml | 18 + hadoop-tools/hadoop-aliyun/pom.xml | 147 +++++ .../aliyun/oss/AliyunCredentialsProvider.java | 87 +++ .../fs/aliyun/oss/AliyunOSSFileSystem.java | 608 +++++++++++++++++++ .../fs/aliyun/oss/AliyunOSSFileSystemStore.java | 549 +++++++++++++++++ .../fs/aliyun/oss/AliyunOSSInputStream.java | 262 ++++++++ .../fs/aliyun/oss/AliyunOSSOutputStream.java | 111 ++++ .../hadoop/fs/aliyun/oss/AliyunOSSUtils.java | 167 +++++ .../apache/hadoop/fs/aliyun/oss/Constants.java | 113 ++++ .../hadoop/fs/aliyun/oss/package-info.java | 22 + .../site/markdown/tools/hadoop-aliyun/index.md | 294 +++++++++ .../fs/aliyun/oss/AliyunOSSTestUtils.java | 77 +++ .../fs/aliyun/oss/TestAliyunCredentials.java | 78 +++ .../oss/TestAliyunOSSFileSystemContract.java | 218 +++++++ .../oss/TestAliyunOSSFileSystemStore.java | 125 ++++ .../fs/aliyun/oss/TestAliyunOSSInputStream.java | 155 +++++ .../aliyun/oss/TestAliyunOSSOutputStream.java | 91 +++ .../aliyun/oss/contract/AliyunOSSContract.java | 49 ++ .../contract/TestAliyunOSSContractCreate.java | 35 ++ .../contract/TestAliyunOSSContractDelete.java | 34 ++ .../contract/TestAliyunOSSContractDistCp.java | 44 ++ .../TestAliyunOSSContractGetFileStatus.java | 35 ++ .../contract/TestAliyunOSSContractMkdir.java | 34 ++ .../oss/contract/TestAliyunOSSContractOpen.java | 34 ++ .../contract/TestAliyunOSSContractRename.java | 35 ++ .../contract/TestAliyunOSSContractRootDir.java | 69 +++ .../oss/contract/TestAliyunOSSContractSeek.java | 60 ++ .../src/test/resources/contract/aliyun-oss.xml | 120 ++++ .../src/test/resources/core-site.xml | 46 ++ .../src/test/resources/log4j.properties | 23 + hadoop-tools/hadoop-tools-dist/pom.xml | 6 + hadoop-tools/pom.xml | 1 + 33 files changed, 3768 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b756beb6/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 4eac7d5..4bb640f 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -483,7 +483,11 @@ <artifactId>hadoop-aws</artifactId> <version>${project.version}</version> </dependency> - + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aliyun</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-kms</artifactId> @@ -1078,6 +1082,22 @@ <version>2.9.1</version> </dependency> + <dependency> + <groupId>com.aliyun.oss</groupId> + <artifactId>aliyun-sdk-oss</artifactId> + <version>2.8.1</version> + <exclusions> + <exclusion> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </exclusion> + <exclusion> + <groupId>commons-beanutils</groupId> + <artifactId>commons-beanutils</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b756beb6/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml new file mode 100644 index 0000000..40d78d0 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml @@ -0,0 +1,18 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<FindBugsFilter> +</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b756beb6/hadoop-tools/hadoop-aliyun/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/pom.xml b/hadoop-tools/hadoop-aliyun/pom.xml new file mode 100644 index 0000000..357786b --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/pom.xml @@ -0,0 +1,147 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-project</artifactId> + <version>2.9.1-SNAPSHOT</version> + <relativePath>../../hadoop-project</relativePath> + </parent> + <artifactId>hadoop-aliyun</artifactId> + <name>Apache Hadoop Aliyun OSS support</name> + <packaging>jar</packaging> + + <properties> + <file.encoding>UTF-8</file.encoding> + <downloadSources>true</downloadSources> + </properties> + + <profiles> + <profile> + <id>tests-off</id> + <activation> + <file> + <missing>src/test/resources/auth-keys.xml</missing> + </file> + </activation> + <properties> + <maven.test.skip>true</maven.test.skip> + </properties> + </profile> + <profile> + <id>tests-on</id> + <activation> + <file> + <exists>src/test/resources/auth-keys.xml</exists> + </file> + </activation> + <properties> + <maven.test.skip>false</maven.test.skip> + </properties> + </profile> + </profiles> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <configuration> + <findbugsXmlOutput>true</findbugsXmlOutput> + <xmlOutput>true</xmlOutput> + <excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml + </excludeFilterFile> + <effort>Max</effort> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>deplist</id> + <phase>compile</phase> + <goals> + <goal>list</goal> + </goals> + <configuration> + <!-- build a shellprofile --> + <outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.aliyun.oss</groupId> + <artifactId>aliyun-sdk-oss</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-distcp</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-distcp</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b756beb6/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java new file mode 100644 index 0000000..b46c67a --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import com.aliyun.oss.common.auth.Credentials; +import com.aliyun.oss.common.auth.CredentialsProvider; +import com.aliyun.oss.common.auth.DefaultCredentials; +import com.aliyun.oss.common.auth.InvalidCredentialsException; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +/** + * Support session credentials for authenticating with Aliyun. + */ +public class AliyunCredentialsProvider implements CredentialsProvider { + private Credentials credentials = null; + + public AliyunCredentialsProvider(Configuration conf) + throws IOException { + String accessKeyId; + String accessKeySecret; + String securityToken; + try { + accessKeyId = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_ID); + accessKeySecret = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_SECRET); + } catch (IOException e) { + throw new InvalidCredentialsException(e); + } + + try { + securityToken = AliyunOSSUtils.getValueWithKey(conf, SECURITY_TOKEN); + } catch (IOException e) { + securityToken = null; + } + + if (StringUtils.isEmpty(accessKeyId) + || StringUtils.isEmpty(accessKeySecret)) { + throw new InvalidCredentialsException( + "AccessKeyId and AccessKeySecret should not be null or empty."); + } + + if (StringUtils.isNotEmpty(securityToken)) { + credentials = new DefaultCredentials(accessKeyId, accessKeySecret, + securityToken); + } else { + credentials = new DefaultCredentials(accessKeyId, accessKeySecret); + } + } + + @Override + public void setCredentials(Credentials creds) { + if (creds == null) { + throw new InvalidCredentialsException("Credentials should not be null."); + } + + credentials = creds; + } + + @Override + public Credentials getCredentials() { + if (credentials == null) { + throw new InvalidCredentialsException("Invalid credentials"); + } + + return credentials; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b756beb6/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java new file mode 100644 index 0000000..3561b02 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -0,0 +1,608 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import com.aliyun.oss.model.OSSObjectSummary; +import com.aliyun.oss.model.ObjectListing; +import com.aliyun.oss.model.ObjectMetadata; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +/** + * Implementation of {@link FileSystem} for <a href="https://oss.aliyun.com"> + * Aliyun OSS</a>, used to access OSS blob system in a filesystem style. + */ +public class AliyunOSSFileSystem extends FileSystem { + private static final Logger LOG = + LoggerFactory.getLogger(AliyunOSSFileSystem.class); + private URI uri; + private String bucket; + private Path workingDir; + private AliyunOSSFileSystemStore store; + private int maxKeys; + + @Override + public FSDataOutputStream append(Path path, int bufferSize, + Progressable progress) throws IOException { + throw new IOException("Append is not supported!"); + } + + @Override + public void close() throws IOException { + try { + store.close(); + } finally { + super.close(); + } + } + + @Override + public FSDataOutputStream create(Path path, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + String key = pathToKey(path); + FileStatus status = null; + + try { + // get the status or throw a FNFE + status = getFileStatus(path); + + // if the thread reaches here, there is something at the path + if (status.isDirectory()) { + // path references a directory + throw new FileAlreadyExistsException(path + " is a directory"); + } + if (!overwrite) { + // path references a file and overwrite is disabled + throw new FileAlreadyExistsException(path + " already exists"); + } + LOG.debug("Overwriting file {}", path); + } catch (FileNotFoundException e) { + // this means the file is not found + } + + return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(), + store, key, progress, statistics), (Statistics)(null)); + } + + /** + * {@inheritDoc} + * @throws FileNotFoundException if the parent directory is not present -or + * is not a directory. + */ + @Override + public FSDataOutputStream createNonRecursive(Path path, + FsPermission permission, + EnumSet<CreateFlag> flags, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + Path parent = path.getParent(); + if (parent != null) { + // expect this to raise an exception if there is no parent + if (!getFileStatus(parent).isDirectory()) { + throw new FileAlreadyExistsException("Not a directory: " + parent); + } + } + return create(path, permission, + flags.contains(CreateFlag.OVERWRITE), bufferSize, + replication, blockSize, progress); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + try { + return innerDelete(getFileStatus(path), recursive); + } catch (FileNotFoundException e) { + LOG.debug("Couldn't delete {} - does not exist", path); + return false; + } + } + + /** + * Delete an object. See {@link #delete(Path, boolean)}. + * + * @param status fileStatus object + * @param recursive if path is a directory and set to + * true, the directory is deleted else throws an exception. In + * case of a file the recursive can be set to either true or false. + * @return true if delete is successful else false. + * @throws IOException due to inability to delete a directory or file. + */ + private boolean innerDelete(FileStatus status, boolean recursive) + throws IOException { + Path f = status.getPath(); + String p = f.toUri().getPath(); + FileStatus[] statuses; + // indicating root directory "/". + if (p.equals("/")) { + statuses = listStatus(status.getPath()); + boolean isEmptyDir = statuses.length <= 0; + return rejectRootDirectoryDelete(isEmptyDir, recursive); + } + + String key = pathToKey(f); + if (status.isDirectory()) { + if (!recursive) { + // Check whether it is an empty directory or not + statuses = listStatus(status.getPath()); + if (statuses.length > 0) { + throw new IOException("Cannot remove directory " + f + + ": It is not empty!"); + } else { + // Delete empty directory without '-r' + key = AliyunOSSUtils.maybeAddTrailingSlash(key); + store.deleteObject(key); + } + } else { + store.deleteDirs(key); + } + } else { + store.deleteObject(key); + } + + createFakeDirectoryIfNecessary(f); + return true; + } + + /** + * Implements the specific logic to reject root directory deletion. + * The caller must return the result of this call, rather than + * attempt to continue with the delete operation: deleting root + * directories is never allowed. This method simply implements + * the policy of when to return an exit code versus raise an exception. + * @param isEmptyDir empty directory or not + * @param recursive recursive flag from command + * @return a return code for the operation + * @throws PathIOException if the operation was explicitly rejected. + */ + private boolean rejectRootDirectoryDelete(boolean isEmptyDir, + boolean recursive) throws IOException { + LOG.info("oss delete the {} root directory of {}", bucket, recursive); + if (isEmptyDir) { + return true; + } + if (recursive) { + return false; + } else { + // reject + throw new PathIOException(bucket, "Cannot delete root path"); + } + } + + private void createFakeDirectoryIfNecessary(Path f) throws IOException { + String key = pathToKey(f); + if (StringUtils.isNotEmpty(key) && !exists(f)) { + LOG.debug("Creating new fake directory at {}", f); + mkdir(pathToKey(f.getParent())); + } + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + Path qualifiedPath = path.makeQualified(uri, workingDir); + String key = pathToKey(qualifiedPath); + + // Root always exists + if (key.length() == 0) { + return new FileStatus(0, true, 1, 0, 0, qualifiedPath); + } + + ObjectMetadata meta = store.getObjectMetadata(key); + // If key not found and key does not end with "/" + if (meta == null && !key.endsWith("/")) { + // In case of 'dir + "/"' + key += "/"; + meta = store.getObjectMetadata(key); + } + if (meta == null) { + ObjectListing listing = store.listObjects(key, 1, null, false); + if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) || + CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) { + return new FileStatus(0, true, 1, 0, 0, qualifiedPath); + } else { + throw new FileNotFoundException(path + ": No such file or directory!"); + } + } else if (objectRepresentsDirectory(key, meta.getContentLength())) { + return new FileStatus(0, true, 1, 0, meta.getLastModified().getTime(), + qualifiedPath); + } else { + return new FileStatus(meta.getContentLength(), false, 1, + getDefaultBlockSize(path), meta.getLastModified().getTime(), + qualifiedPath); + } + } + + @Override + public String getScheme() { + return "oss"; + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public Path getWorkingDirectory() { + return workingDir; + } + + @Deprecated + public long getDefaultBlockSize() { + return getConf().getLong(FS_OSS_BLOCK_SIZE_KEY, FS_OSS_BLOCK_SIZE_DEFAULT); + } + + @Override + public String getCanonicalServiceName() { + // Does not support Token + return null; + } + + /** + * Initialize new FileSystem. + * + * @param name the uri of the file system, including host, port, etc. + * @param conf configuration of the file system + * @throws IOException IO problems + */ + public void initialize(URI name, Configuration conf) throws IOException { + super.initialize(name, conf); + + bucket = name.getHost(); + uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority()); + workingDir = new Path("/user", + System.getProperty("user.name")).makeQualified(uri, null); + + store = new AliyunOSSFileSystemStore(); + store.initialize(name, conf, statistics); + maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); + setConf(conf); + } + + /** + * Check if OSS object represents a directory. + * + * @param name object key + * @param size object content length + * @return true if object represents a directory + */ + private boolean objectRepresentsDirectory(final String name, + final long size) { + return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L; + } + + /** + * Turn a path (relative or otherwise) into an OSS key. + * + * @param path the path of the file. + * @return the key of the object that represents the file. + */ + private String pathToKey(Path path) { + if (!path.isAbsolute()) { + path = new Path(workingDir, path); + } + + return path.toUri().getPath().substring(1); + } + + private Path keyToPath(String key) { + return new Path("/" + key); + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + String key = pathToKey(path); + if (LOG.isDebugEnabled()) { + LOG.debug("List status for path: " + path); + } + + final List<FileStatus> result = new ArrayList<FileStatus>(); + final FileStatus fileStatus = getFileStatus(path); + + if (fileStatus.isDirectory()) { + if (LOG.isDebugEnabled()) { + LOG.debug("listStatus: doing listObjects for directory " + key); + } + + ObjectListing objects = store.listObjects(key, maxKeys, null, false); + while (true) { + statistics.incrementReadOps(1); + for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { + String objKey = objectSummary.getKey(); + if (objKey.equals(key + "/")) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring: " + objKey); + } + continue; + } else { + Path keyPath = keyToPath(objectSummary.getKey()) + .makeQualified(uri, workingDir); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: fi: " + keyPath); + } + result.add(new FileStatus(objectSummary.getSize(), false, 1, + getDefaultBlockSize(keyPath), + objectSummary.getLastModified().getTime(), keyPath)); + } + } + + for (String prefix : objects.getCommonPrefixes()) { + if (prefix.equals(key + "/")) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring: " + prefix); + } + continue; + } else { + Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: rd: " + keyPath); + } + result.add(getFileStatus(keyPath)); + } + } + + if (objects.isTruncated()) { + if (LOG.isDebugEnabled()) { + LOG.debug("listStatus: list truncated - getting next batch"); + } + String nextMarker = objects.getNextMarker(); + objects = store.listObjects(key, maxKeys, nextMarker, false); + statistics.incrementReadOps(1); + } else { + break; + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: rd (not a dir): " + path); + } + result.add(fileStatus); + } + + return result.toArray(new FileStatus[result.size()]); + } + + /** + * Used to create an empty file that represents an empty directory. + * + * @param key directory path + * @return true if directory is successfully created + * @throws IOException + */ + private boolean mkdir(final String key) throws IOException { + String dirName = key; + if (StringUtils.isNotEmpty(key)) { + if (!key.endsWith("/")) { + dirName += "/"; + } + store.storeEmptyFile(dirName); + } + return true; + } + + @Override + public boolean mkdirs(Path path, FsPermission permission) + throws IOException { + try { + FileStatus fileStatus = getFileStatus(path); + + if (fileStatus.isDirectory()) { + return true; + } else { + throw new FileAlreadyExistsException("Path is a file: " + path); + } + } catch (FileNotFoundException e) { + validatePath(path); + String key = pathToKey(path); + return mkdir(key); + } + } + + /** + * Check whether the path is a valid path. + * + * @param path the path to be checked. + * @throws IOException + */ + private void validatePath(Path path) throws IOException { + Path fPart = path.getParent(); + do { + try { + FileStatus fileStatus = getFileStatus(fPart); + if (fileStatus.isDirectory()) { + // If path exists and a directory, exit + break; + } else { + throw new FileAlreadyExistsException(String.format( + "Can't make directory for path '%s', it is a file.", fPart)); + } + } catch (FileNotFoundException fnfe) { + } + fPart = fPart.getParent(); + } while (fPart != null); + } + + @Override + public FSDataInputStream open(Path path, int bufferSize) throws IOException { + final FileStatus fileStatus = getFileStatus(path); + if (fileStatus.isDirectory()) { + throw new FileNotFoundException("Can't open " + path + + " because it is a directory"); + } + + return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store, + pathToKey(path), fileStatus.getLen(), statistics)); + } + + @Override + public boolean rename(Path srcPath, Path dstPath) throws IOException { + if (srcPath.isRoot()) { + // Cannot rename root of file system + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot rename the root of a filesystem"); + } + return false; + } + Path parent = dstPath.getParent(); + while (parent != null && !srcPath.equals(parent)) { + parent = parent.getParent(); + } + if (parent != null) { + return false; + } + FileStatus srcStatus = getFileStatus(srcPath); + FileStatus dstStatus; + try { + dstStatus = getFileStatus(dstPath); + } catch (FileNotFoundException fnde) { + dstStatus = null; + } + if (dstStatus == null) { + // If dst doesn't exist, check whether dst dir exists or not + dstStatus = getFileStatus(dstPath.getParent()); + if (!dstStatus.isDirectory()) { + throw new IOException(String.format( + "Failed to rename %s to %s, %s is a file", srcPath, dstPath, + dstPath.getParent())); + } + } else { + if (srcStatus.getPath().equals(dstStatus.getPath())) { + return !srcStatus.isDirectory(); + } else if (dstStatus.isDirectory()) { + // If dst is a directory + dstPath = new Path(dstPath, srcPath.getName()); + FileStatus[] statuses; + try { + statuses = listStatus(dstPath); + } catch (FileNotFoundException fnde) { + statuses = null; + } + if (statuses != null && statuses.length > 0) { + // If dst exists and not a directory / not empty + throw new FileAlreadyExistsException(String.format( + "Failed to rename %s to %s, file already exists or not empty!", + srcPath, dstPath)); + } + } else { + // If dst is not a directory + throw new FileAlreadyExistsException(String.format( + "Failed to rename %s to %s, file already exists!", srcPath, + dstPath)); + } + } + if (srcStatus.isDirectory()) { + copyDirectory(srcPath, dstPath); + } else { + copyFile(srcPath, dstPath); + } + + return srcPath.equals(dstPath) || delete(srcPath, true); + } + + /** + * Copy file from source path to destination path. + * (the caller should make sure srcPath is a file and dstPath is valid) + * + * @param srcPath source path. + * @param dstPath destination path. + * @return true if file is successfully copied. + */ + private boolean copyFile(Path srcPath, Path dstPath) { + String srcKey = pathToKey(srcPath); + String dstKey = pathToKey(dstPath); + return store.copyFile(srcKey, dstKey); + } + + /** + * Copy a directory from source path to destination path. + * (the caller should make sure srcPath is a directory, and dstPath is valid) + * + * @param srcPath source path. + * @param dstPath destination path. + * @return true if directory is successfully copied. + */ + private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { + String srcKey = AliyunOSSUtils + .maybeAddTrailingSlash(pathToKey(srcPath)); + String dstKey = AliyunOSSUtils + .maybeAddTrailingSlash(pathToKey(dstPath)); + + if (dstKey.startsWith(srcKey)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot rename a directory to a subdirectory of self"); + } + return false; + } + + store.storeEmptyFile(dstKey); + ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true); + statistics.incrementReadOps(1); + // Copy files from src folder to dst + while (true) { + for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { + String newKey = + dstKey.concat(objectSummary.getKey().substring(srcKey.length())); + store.copyFile(objectSummary.getKey(), newKey); + } + if (objects.isTruncated()) { + String nextMarker = objects.getNextMarker(); + objects = store.listObjects(srcKey, maxKeys, nextMarker, true); + statistics.incrementReadOps(1); + } else { + break; + } + } + return true; + } + + @Override + public void setWorkingDirectory(Path dir) { + this.workingDir = dir; + } + + public AliyunOSSFileSystemStore getStore() { + return store; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b756beb6/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java new file mode 100644 index 0000000..aba3db8 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java @@ -0,0 +1,549 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.aliyun.oss; + +import com.aliyun.oss.ClientConfiguration; +import com.aliyun.oss.ClientException; +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.common.auth.CredentialsProvider; +import com.aliyun.oss.common.comm.Protocol; +import com.aliyun.oss.model.AbortMultipartUploadRequest; +import com.aliyun.oss.model.CannedAccessControlList; +import com.aliyun.oss.model.CompleteMultipartUploadRequest; +import com.aliyun.oss.model.CompleteMultipartUploadResult; +import com.aliyun.oss.model.CopyObjectResult; +import com.aliyun.oss.model.DeleteObjectsRequest; +import com.aliyun.oss.model.DeleteObjectsResult; +import com.aliyun.oss.model.GetObjectRequest; +import com.aliyun.oss.model.InitiateMultipartUploadRequest; +import com.aliyun.oss.model.InitiateMultipartUploadResult; +import com.aliyun.oss.model.ListObjectsRequest; +import com.aliyun.oss.model.ObjectMetadata; +import com.aliyun.oss.model.ObjectListing; +import com.aliyun.oss.model.OSSObjectSummary; +import com.aliyun.oss.model.PartETag; +import com.aliyun.oss.model.PutObjectResult; +import com.aliyun.oss.model.UploadPartCopyRequest; +import com.aliyun.oss.model.UploadPartCopyResult; +import com.aliyun.oss.model.UploadPartRequest; +import com.aliyun.oss.model.UploadPartResult; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +/** + * Core implementation of Aliyun OSS Filesystem for Hadoop. + * Provides the bridging logic between Hadoop's abstract filesystem and + * Aliyun OSS. + */ +public class AliyunOSSFileSystemStore { + public static final Logger LOG = + LoggerFactory.getLogger(AliyunOSSFileSystemStore.class); + private FileSystem.Statistics statistics; + private OSSClient ossClient; + private String bucketName; + private long uploadPartSize; + private long multipartThreshold; + private long partSize; + private int maxKeys; + private String serverSideEncryptionAlgorithm; + + public void initialize(URI uri, Configuration conf, + FileSystem.Statistics stat) throws IOException { + statistics = stat; + ClientConfiguration clientConf = new ClientConfiguration(); + clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY, + MAXIMUM_CONNECTIONS_DEFAULT)); + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY, + SECURE_CONNECTIONS_DEFAULT); + clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); + clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY, + MAX_ERROR_RETRIES_DEFAULT)); + clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY, + ESTABLISH_TIMEOUT_DEFAULT)); + clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY, + SOCKET_TIMEOUT_DEFAULT)); + + String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, ""); + int proxyPort = conf.getInt(PROXY_PORT_KEY, -1); + if (StringUtils.isNotEmpty(proxyHost)) { + clientConf.setProxyHost(proxyHost); + if (proxyPort >= 0) { + clientConf.setProxyPort(proxyPort); + } else { + if (secureConnections) { + LOG.warn("Proxy host set without port. Using HTTPS default 443"); + clientConf.setProxyPort(443); + } else { + LOG.warn("Proxy host set without port. Using HTTP default 80"); + clientConf.setProxyPort(80); + } + } + String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY); + String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY); + if ((proxyUsername == null) != (proxyPassword == null)) { + String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " + + PROXY_PASSWORD_KEY + " set without the other."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + clientConf.setProxyUsername(proxyUsername); + clientConf.setProxyPassword(proxyPassword); + clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY)); + clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY)); + } else if (proxyPort >= 0) { + String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " + + PROXY_HOST_KEY; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + + String endPoint = conf.getTrimmed(ENDPOINT_KEY, ""); + if (StringUtils.isEmpty(endPoint)) { + throw new IllegalArgumentException("Aliyun OSS endpoint should not be " + + "null or empty. Please set proper endpoint with 'fs.oss.endpoint'."); + } + CredentialsProvider provider = + AliyunOSSUtils.getCredentialsProvider(conf); + ossClient = new OSSClient(endPoint, provider, clientConf); + uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, + MULTIPART_UPLOAD_SIZE_DEFAULT); + multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, + MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); + partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, + MULTIPART_UPLOAD_SIZE_DEFAULT); + if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) { + partSize = MIN_MULTIPART_UPLOAD_PART_SIZE; + } + serverSideEncryptionAlgorithm = + conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, ""); + + if (uploadPartSize < 5 * 1024 * 1024) { + LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB"); + uploadPartSize = 5 * 1024 * 1024; + } + + if (multipartThreshold < 5 * 1024 * 1024) { + LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB"); + multipartThreshold = 5 * 1024 * 1024; + } + + if (multipartThreshold > 1024 * 1024 * 1024) { + LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB"); + multipartThreshold = 1024 * 1024 * 1024; + } + + String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT); + if (StringUtils.isNotEmpty(cannedACLName)) { + CannedAccessControlList cannedACL = + CannedAccessControlList.valueOf(cannedACLName); + ossClient.setBucketAcl(bucketName, cannedACL); + } + + maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); + bucketName = uri.getHost(); + } + + /** + * Delete an object, and update write operation statistics. + * + * @param key key to blob to delete. + */ + public void deleteObject(String key) { + ossClient.deleteObject(bucketName, key); + statistics.incrementWriteOps(1); + } + + /** + * Delete a list of keys, and update write operation statistics. + * + * @param keysToDelete collection of keys to delete. + * @throws IOException if failed to delete objects. + */ + public void deleteObjects(List<String> keysToDelete) throws IOException { + if (CollectionUtils.isEmpty(keysToDelete)) { + LOG.warn("Keys to delete is empty."); + return; + } + + int retry = 10; + int tries = 0; + List<String> deleteFailed = keysToDelete; + while(CollectionUtils.isNotEmpty(deleteFailed)) { + DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucketName); + deleteRequest.setKeys(deleteFailed); + // There are two modes to do batch delete: + // 1. detail mode: DeleteObjectsResult.getDeletedObjects returns objects + // which were deleted successfully. + // 2. simple mode: DeleteObjectsResult.getDeletedObjects returns objects + // which were deleted unsuccessfully. + // Here, we choose the simple mode to do batch delete. + deleteRequest.setQuiet(true); + DeleteObjectsResult result = ossClient.deleteObjects(deleteRequest); + deleteFailed = result.getDeletedObjects(); + tries++; + if (tries == retry) { + break; + } + } + + if (tries == retry && CollectionUtils.isNotEmpty(deleteFailed)) { + // Most of time, it is impossible to try 10 times, expect the + // Aliyun OSS service problems. + throw new IOException("Failed to delete Aliyun OSS objects for " + + tries + " times."); + } + } + + /** + * Delete a directory from Aliyun OSS. + * + * @param key directory key to delete. + * @throws IOException if failed to delete directory. + */ + public void deleteDirs(String key) throws IOException { + key = AliyunOSSUtils.maybeAddTrailingSlash(key); + ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); + listRequest.setPrefix(key); + listRequest.setDelimiter(null); + listRequest.setMaxKeys(maxKeys); + + while (true) { + ObjectListing objects = ossClient.listObjects(listRequest); + statistics.incrementReadOps(1); + List<String> keysToDelete = new ArrayList<String>(); + for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { + keysToDelete.add(objectSummary.getKey()); + } + deleteObjects(keysToDelete); + if (objects.isTruncated()) { + listRequest.setMarker(objects.getNextMarker()); + } else { + break; + } + } + } + + /** + * Return metadata of a given object key. + * + * @param key object key. + * @return return null if key does not exist. + */ + public ObjectMetadata getObjectMetadata(String key) { + try { + return ossClient.getObjectMetadata(bucketName, key); + } catch (OSSException osse) { + return null; + } finally { + statistics.incrementReadOps(1); + } + } + + /** + * Upload an empty file as an OSS object, using single upload. + * + * @param key object key. + * @throws IOException if failed to upload object. + */ + public void storeEmptyFile(String key) throws IOException { + ObjectMetadata dirMeta = new ObjectMetadata(); + byte[] buffer = new byte[0]; + ByteArrayInputStream in = new ByteArrayInputStream(buffer); + dirMeta.setContentLength(0); + try { + ossClient.putObject(bucketName, key, in, dirMeta); + } finally { + in.close(); + } + } + + /** + * Copy an object from source key to destination key. + * + * @param srcKey source key. + * @param dstKey destination key. + * @return true if file is successfully copied. + */ + public boolean copyFile(String srcKey, String dstKey) { + ObjectMetadata objectMeta = + ossClient.getObjectMetadata(bucketName, srcKey); + long contentLength = objectMeta.getContentLength(); + if (contentLength <= multipartThreshold) { + return singleCopy(srcKey, dstKey); + } else { + return multipartCopy(srcKey, contentLength, dstKey); + } + } + + /** + * Use single copy to copy an OSS object. + * (The caller should make sure srcPath is a file and dstPath is valid) + * + * @param srcKey source key. + * @param dstKey destination key. + * @return true if object is successfully copied. + */ + private boolean singleCopy(String srcKey, String dstKey) { + CopyObjectResult copyResult = + ossClient.copyObject(bucketName, srcKey, bucketName, dstKey); + LOG.debug(copyResult.getETag()); + return true; + } + + /** + * Use multipart copy to copy an OSS object. + * (The caller should make sure srcPath is a file and dstPath is valid) + * + * @param srcKey source key. + * @param contentLength data size of the object to copy. + * @param dstKey destination key. + * @return true if success, or false if upload is aborted. + */ + private boolean multipartCopy(String srcKey, long contentLength, + String dstKey) { + long realPartSize = + AliyunOSSUtils.calculatePartSize(contentLength, uploadPartSize); + int partNum = (int) (contentLength / realPartSize); + if (contentLength % realPartSize != 0) { + partNum++; + } + InitiateMultipartUploadRequest initiateMultipartUploadRequest = + new InitiateMultipartUploadRequest(bucketName, dstKey); + ObjectMetadata meta = new ObjectMetadata(); + if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { + meta.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + initiateMultipartUploadRequest.setObjectMetadata(meta); + InitiateMultipartUploadResult initiateMultipartUploadResult = + ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); + String uploadId = initiateMultipartUploadResult.getUploadId(); + List<PartETag> partETags = new ArrayList<PartETag>(); + try { + for (int i = 0; i < partNum; i++) { + long skipBytes = realPartSize * i; + long size = (realPartSize < contentLength - skipBytes) ? + realPartSize : contentLength - skipBytes; + UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest(); + partCopyRequest.setSourceBucketName(bucketName); + partCopyRequest.setSourceKey(srcKey); + partCopyRequest.setBucketName(bucketName); + partCopyRequest.setKey(dstKey); + partCopyRequest.setUploadId(uploadId); + partCopyRequest.setPartSize(size); + partCopyRequest.setBeginIndex(skipBytes); + partCopyRequest.setPartNumber(i + 1); + UploadPartCopyResult partCopyResult = + ossClient.uploadPartCopy(partCopyRequest); + statistics.incrementWriteOps(1); + partETags.add(partCopyResult.getPartETag()); + } + CompleteMultipartUploadRequest completeMultipartUploadRequest = + new CompleteMultipartUploadRequest(bucketName, dstKey, + uploadId, partETags); + CompleteMultipartUploadResult completeMultipartUploadResult = + ossClient.completeMultipartUpload(completeMultipartUploadRequest); + LOG.debug(completeMultipartUploadResult.getETag()); + return true; + } catch (OSSException | ClientException e) { + AbortMultipartUploadRequest abortMultipartUploadRequest = + new AbortMultipartUploadRequest(bucketName, dstKey, uploadId); + ossClient.abortMultipartUpload(abortMultipartUploadRequest); + return false; + } + } + + /** + * Upload a file as an OSS object, using single upload. + * + * @param key object key. + * @param file local file to upload. + * @throws IOException if failed to upload object. + */ + public void uploadObject(String key, File file) throws IOException { + File object = file.getAbsoluteFile(); + FileInputStream fis = new FileInputStream(object); + ObjectMetadata meta = new ObjectMetadata(); + meta.setContentLength(object.length()); + if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { + meta.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + try { + PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta); + LOG.debug(result.getETag()); + statistics.incrementWriteOps(1); + } finally { + fis.close(); + } + } + + /** + * Upload a file as an OSS object, using multipart upload. + * + * @param key object key. + * @param file local file to upload. + * @throws IOException if failed to upload object. + */ + public void multipartUploadObject(String key, File file) throws IOException { + File object = file.getAbsoluteFile(); + long dataLen = object.length(); + long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize); + int partNum = (int) (dataLen / realPartSize); + if (dataLen % realPartSize != 0) { + partNum += 1; + } + + InitiateMultipartUploadRequest initiateMultipartUploadRequest = + new InitiateMultipartUploadRequest(bucketName, key); + ObjectMetadata meta = new ObjectMetadata(); + if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { + meta.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + initiateMultipartUploadRequest.setObjectMetadata(meta); + InitiateMultipartUploadResult initiateMultipartUploadResult = + ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); + List<PartETag> partETags = new ArrayList<PartETag>(); + String uploadId = initiateMultipartUploadResult.getUploadId(); + + try { + for (int i = 0; i < partNum; i++) { + //TODO Optimize this, avoid opening the object multiple times. + FileInputStream fis = new FileInputStream(object); + try { + long skipBytes = realPartSize * i; + AliyunOSSUtils.skipFully(fis, skipBytes); + long size = (realPartSize < dataLen - skipBytes) ? + realPartSize : dataLen - skipBytes; + UploadPartRequest uploadPartRequest = new UploadPartRequest(); + uploadPartRequest.setBucketName(bucketName); + uploadPartRequest.setKey(key); + uploadPartRequest.setUploadId(uploadId); + uploadPartRequest.setInputStream(fis); + uploadPartRequest.setPartSize(size); + uploadPartRequest.setPartNumber(i + 1); + UploadPartResult uploadPartResult = + ossClient.uploadPart(uploadPartRequest); + statistics.incrementWriteOps(1); + partETags.add(uploadPartResult.getPartETag()); + } finally { + fis.close(); + } + } + CompleteMultipartUploadRequest completeMultipartUploadRequest = + new CompleteMultipartUploadRequest(bucketName, key, + uploadId, partETags); + CompleteMultipartUploadResult completeMultipartUploadResult = + ossClient.completeMultipartUpload(completeMultipartUploadRequest); + LOG.debug(completeMultipartUploadResult.getETag()); + } catch (OSSException | ClientException e) { + AbortMultipartUploadRequest abortMultipartUploadRequest = + new AbortMultipartUploadRequest(bucketName, key, uploadId); + ossClient.abortMultipartUpload(abortMultipartUploadRequest); + } + } + + /** + * list objects. + * + * @param prefix prefix. + * @param maxListingLength max no. of entries + * @param marker last key in any previous search. + * @param recursive whether to list directory recursively. + * @return a list of matches. + */ + public ObjectListing listObjects(String prefix, int maxListingLength, + String marker, boolean recursive) { + String delimiter = recursive ? null : "/"; + prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix); + ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); + listRequest.setPrefix(prefix); + listRequest.setDelimiter(delimiter); + listRequest.setMaxKeys(maxListingLength); + listRequest.setMarker(marker); + + ObjectListing listing = ossClient.listObjects(listRequest); + statistics.incrementReadOps(1); + return listing; + } + + /** + * Retrieve a part of an object. + * + * @param key the object name that is being retrieved from the Aliyun OSS. + * @param byteStart start position. + * @param byteEnd end position. + * @return This method returns null if the key is not found. + */ + public InputStream retrieve(String key, long byteStart, long byteEnd) { + try { + GetObjectRequest request = new GetObjectRequest(bucketName, key); + request.setRange(byteStart, byteEnd); + return ossClient.getObject(request).getObjectContent(); + } catch (OSSException | ClientException e) { + return null; + } + } + + /** + * Close OSS client properly. + */ + public void close() { + if (ossClient != null) { + ossClient.shutdown(); + ossClient = null; + } + } + + /** + * Clean up all objects matching the prefix. + * + * @param prefix Aliyun OSS object prefix. + * @throws IOException if failed to clean up objects. + */ + public void purge(String prefix) throws IOException { + String key; + try { + ObjectListing objects = listObjects(prefix, maxKeys, null, true); + for (OSSObjectSummary object : objects.getObjectSummaries()) { + key = object.getKey(); + ossClient.deleteObject(bucketName, key); + } + + for (String dir: objects.getCommonPrefixes()) { + deleteDirs(dir); + } + } catch (OSSException | ClientException e) { + LOG.error("Failed to purge " + prefix); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b756beb6/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java new file mode 100644 index 0000000..3b2bc02 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem.Statistics; + +/** + * The input stream for OSS blob system. + * The class uses multi-part downloading to read data from the object content + * stream. + */ +public class AliyunOSSInputStream extends FSInputStream { + public static final Log LOG = LogFactory.getLog(AliyunOSSInputStream.class); + private final long downloadPartSize; + private AliyunOSSFileSystemStore store; + private final String key; + private Statistics statistics; + private boolean closed; + private InputStream wrappedStream = null; + private long contentLength; + private long position; + private long partRemaining; + + public AliyunOSSInputStream(Configuration conf, + AliyunOSSFileSystemStore store, String key, Long contentLength, + Statistics statistics) throws IOException { + this.store = store; + this.key = key; + this.statistics = statistics; + this.contentLength = contentLength; + downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY, + MULTIPART_DOWNLOAD_SIZE_DEFAULT); + reopen(0); + closed = false; + } + + /** + * Reopen the wrapped stream at give position, by seeking for + * data of a part length from object content stream. + * + * @param pos position from start of a file + * @throws IOException if failed to reopen + */ + private synchronized void reopen(long pos) throws IOException { + long partSize; + + if (pos < 0) { + throw new EOFException("Cannot seek at negative position:" + pos); + } else if (pos > contentLength) { + throw new EOFException("Cannot seek after EOF, contentLength:" + + contentLength + " position:" + pos); + } else if (pos + downloadPartSize > contentLength) { + partSize = contentLength - pos; + } else { + partSize = downloadPartSize; + } + + if (wrappedStream != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Aborting old stream to open at pos " + pos); + } + wrappedStream.close(); + } + + wrappedStream = store.retrieve(key, pos, pos + partSize -1); + if (wrappedStream == null) { + throw new IOException("Null IO stream"); + } + position = pos; + partRemaining = partSize; + } + + @Override + public synchronized int read() throws IOException { + checkNotClosed(); + + if (partRemaining <= 0 && position < contentLength) { + reopen(position); + } + + int tries = MAX_RETRIES; + boolean retry; + int byteRead = -1; + do { + retry = false; + try { + byteRead = wrappedStream.read(); + } catch (Exception e) { + handleReadException(e, --tries); + retry = true; + } + } while (retry); + if (byteRead >= 0) { + position++; + partRemaining--; + } + + if (statistics != null && byteRead >= 0) { + statistics.incrementBytesRead(byteRead); + } + return byteRead; + } + + + /** + * Verify that the input stream is open. Non blocking; this gives + * the last state of the volatile {@link #closed} field. + * + * @throws IOException if the connection is closed. + */ + private void checkNotClosed() throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + } + + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + checkNotClosed(); + + if (buf == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > buf.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + int bytesRead = 0; + // Not EOF, and read not done + while (position < contentLength && bytesRead < len) { + if (partRemaining == 0) { + reopen(position); + } + + int tries = MAX_RETRIES; + boolean retry; + int bytes = -1; + do { + retry = false; + try { + bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead); + } catch (Exception e) { + handleReadException(e, --tries); + retry = true; + } + } while (retry); + + if (bytes > 0) { + bytesRead += bytes; + position += bytes; + partRemaining -= bytes; + } else if (partRemaining != 0) { + throw new IOException("Failed to read from stream. Remaining:" + + partRemaining); + } + } + + if (statistics != null && bytesRead > 0) { + statistics.incrementBytesRead(bytesRead); + } + + // Read nothing, but attempt to read something + if (bytesRead == 0 && len > 0) { + return -1; + } else { + return bytesRead; + } + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + closed = true; + if (wrappedStream != null) { + wrappedStream.close(); + } + } + + @Override + public synchronized int available() throws IOException { + checkNotClosed(); + + long remaining = contentLength - position; + if (remaining > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int)remaining; + } + + @Override + public synchronized void seek(long pos) throws IOException { + checkNotClosed(); + if (position == pos) { + return; + } else if (pos > position && pos < position + partRemaining) { + long len = pos - position; + AliyunOSSUtils.skipFully(wrappedStream, len); + position = pos; + partRemaining -= len; + } else { + reopen(pos); + } + } + + @Override + public synchronized long getPos() throws IOException { + checkNotClosed(); + return position; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + checkNotClosed(); + return false; + } + + private void handleReadException(Exception e, int tries) throws IOException{ + if (tries == 0) { + throw new IOException(e); + } + + LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" + + " connection at position '" + position + "', " + e.getMessage()); + try { + Thread.sleep(100); + } catch (InterruptedException e2) { + LOG.warn(e2.getMessage()); + } + reopen(position); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b756beb6/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java new file mode 100644 index 0000000..c952d0a --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.util.Progressable; + +/** + * The output stream for OSS blob system. + * Data will be buffered on local disk, then uploaded to OSS in + * {@link #close()} method. + */ +public class AliyunOSSOutputStream extends OutputStream { + public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class); + private AliyunOSSFileSystemStore store; + private final String key; + private Statistics statistics; + private Progressable progress; + private long partSizeThreshold; + private LocalDirAllocator dirAlloc; + private boolean closed; + private File tmpFile; + private BufferedOutputStream backupStream; + + public AliyunOSSOutputStream(Configuration conf, + AliyunOSSFileSystemStore store, String key, Progressable progress, + Statistics statistics) throws IOException { + this.store = store; + this.key = key; + // The caller cann't get any progress information + this.progress = progress; + this.statistics = statistics; + partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, + MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); + + if (conf.get(BUFFER_DIR_KEY) == null) { + conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss"); + } + dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY); + + tmpFile = dirAlloc.createTmpFileForWrite("output-", + LocalDirAllocator.SIZE_UNKNOWN, conf); + backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile)); + closed = false; + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + closed = true; + if (backupStream != null) { + backupStream.close(); + } + long dataLen = tmpFile.length(); + try { + if (dataLen <= partSizeThreshold) { + store.uploadObject(key, tmpFile); + } else { + store.multipartUploadObject(key, tmpFile); + } + } finally { + if (!tmpFile.delete()) { + LOG.warn("Can not delete file: " + tmpFile); + } + } + } + + + + @Override + public synchronized void flush() throws IOException { + backupStream.flush(); + } + + @Override + public synchronized void write(int b) throws IOException { + backupStream.write(b); + statistics.incrementBytesWritten(1); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b756beb6/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java new file mode 100644 index 0000000..263b4cf --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import java.io.IOException; +import java.io.InputStream; + +import com.aliyun.oss.common.auth.CredentialsProvider; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.ProviderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +/** + * Utility methods for Aliyun OSS code. + */ +public final class AliyunOSSUtils { + private static final Logger LOG = + LoggerFactory.getLogger(AliyunOSSUtils.class); + + private AliyunOSSUtils() { + } + + /** + * Used to get password from configuration. + * + * @param conf configuration that contains password information + * @param key the key of the password + * @return the value for the key + * @throws IOException if failed to get password from configuration + */ + public static String getValueWithKey(Configuration conf, String key) + throws IOException { + try { + final char[] pass = conf.getPassword(key); + if (pass != null) { + return (new String(pass)).trim(); + } else { + return ""; + } + } catch (IOException ioe) { + throw new IOException("Cannot find password option " + key, ioe); + } + } + + /** + * Skip the requested number of bytes or fail if there are no enough bytes + * left. This allows for the possibility that {@link InputStream#skip(long)} + * may not skip as many bytes as requested (most likely because of reaching + * EOF). + * + * @param is the input stream to skip. + * @param n the number of bytes to skip. + * @throws IOException thrown when skipped less number of bytes. + */ + public static void skipFully(InputStream is, long n) throws IOException { + long total = 0; + long cur = 0; + + do { + cur = is.skip(n - total); + total += cur; + } while((total < n) && (cur > 0)); + + if (total < n) { + throw new IOException("Failed to skip " + n + " bytes, possibly due " + + "to EOF."); + } + } + + /** + * Calculate a proper size of multipart piece. If <code>minPartSize</code> + * is too small, the number of multipart pieces may exceed the limit of + * {@link Constants#MULTIPART_UPLOAD_PART_NUM_LIMIT}. + * + * @param contentLength the size of file. + * @param minPartSize the minimum size of multipart piece. + * @return a revisional size of multipart piece. + */ + public static long calculatePartSize(long contentLength, long minPartSize) { + long tmpPartSize = contentLength / MULTIPART_UPLOAD_PART_NUM_LIMIT + 1; + return Math.max(minPartSize, tmpPartSize); + } + + /** + * Create credential provider specified by configuration, or create default + * credential provider if not specified. + * + * @param conf configuration + * @return a credential provider + * @throws IOException on any problem. Class construction issues may be + * nested inside the IOE. + */ + public static CredentialsProvider getCredentialsProvider(Configuration conf) + throws IOException { + CredentialsProvider credentials; + + String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY); + if (StringUtils.isEmpty(className)) { + Configuration newConf = + ProviderUtils.excludeIncompatibleCredentialProviders(conf, + AliyunOSSFileSystem.class); + credentials = new AliyunCredentialsProvider(newConf); + } else { + try { + LOG.debug("Credential provider class is:" + className); + Class<?> credClass = Class.forName(className); + try { + credentials = + (CredentialsProvider)credClass.getDeclaredConstructor( + Configuration.class).newInstance(conf); + } catch (NoSuchMethodException | SecurityException e) { + credentials = + (CredentialsProvider)credClass.getDeclaredConstructor() + .newInstance(); + } + } catch (ClassNotFoundException e) { + throw new IOException(className + " not found.", e); + } catch (NoSuchMethodException | SecurityException e) { + throw new IOException(String.format("%s constructor exception. A " + + "class specified in %s must provide an accessible constructor " + + "accepting URI and Configuration, or an accessible default " + + "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY), + e); + } catch (ReflectiveOperationException | IllegalArgumentException e) { + throw new IOException(className + " instantiation exception.", e); + } + } + + return credentials; + } + + /** + * Turns a path (relative or otherwise) into an OSS key, adding a trailing + * "/" if the path is not the root <i>and</i> does not already have a "/" + * at the end. + * + * @param key OSS key or "" + * @return the with a trailing "/", or, if it is the root key, "". + */ + public static String maybeAddTrailingSlash(String key) { + if (StringUtils.isNotEmpty(key) && !key.endsWith("/")) { + return key + '/'; + } else { + return key; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b756beb6/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java new file mode 100644 index 0000000..04a2ccd --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +/** + * ALL configuration constants for OSS filesystem. + */ +public final class Constants { + + private Constants() { + } + + // Class of credential provider + public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY = + "fs.oss.credentials.provider"; + + // OSS access verification + public static final String ACCESS_KEY_ID = "fs.oss.accessKeyId"; + public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret"; + public static final String SECURITY_TOKEN = "fs.oss.securityToken"; + + // Number of simultaneous connections to oss + public static final String MAXIMUM_CONNECTIONS_KEY = + "fs.oss.connection.maximum"; + public static final int MAXIMUM_CONNECTIONS_DEFAULT = 32; + + // Connect to oss over ssl + public static final String SECURE_CONNECTIONS_KEY = + "fs.oss.connection.secure.enabled"; + public static final boolean SECURE_CONNECTIONS_DEFAULT = true; + + // Use a custom endpoint + public static final String ENDPOINT_KEY = "fs.oss.endpoint"; + + // Connect to oss through a proxy server + public static final String PROXY_HOST_KEY = "fs.oss.proxy.host"; + public static final String PROXY_PORT_KEY = "fs.oss.proxy.port"; + public static final String PROXY_USERNAME_KEY = "fs.oss.proxy.username"; + public static final String PROXY_PASSWORD_KEY = "fs.oss.proxy.password"; + public static final String PROXY_DOMAIN_KEY = "fs.oss.proxy.domain"; + public static final String PROXY_WORKSTATION_KEY = + "fs.oss.proxy.workstation"; + + // Number of times we should retry errors + public static final String MAX_ERROR_RETRIES_KEY = "fs.oss.attempts.maximum"; + public static final int MAX_ERROR_RETRIES_DEFAULT = 20; + + // Time until we give up trying to establish a connection to oss + public static final String ESTABLISH_TIMEOUT_KEY = + "fs.oss.connection.establish.timeout"; + public static final int ESTABLISH_TIMEOUT_DEFAULT = 50000; + + // Time until we give up on a connection to oss + public static final String SOCKET_TIMEOUT_KEY = "fs.oss.connection.timeout"; + public static final int SOCKET_TIMEOUT_DEFAULT = 200000; + + // Number of records to get while paging through a directory listing + public static final String MAX_PAGING_KEYS_KEY = "fs.oss.paging.maximum"; + public static final int MAX_PAGING_KEYS_DEFAULT = 1000; + + // Size of each of or multipart pieces in bytes + public static final String MULTIPART_UPLOAD_SIZE_KEY = + "fs.oss.multipart.upload.size"; + + public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024; + public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000; + + // Minimum size in bytes before we start a multipart uploads or copy + public static final String MIN_MULTIPART_UPLOAD_THRESHOLD_KEY = + "fs.oss.multipart.upload.threshold"; + public static final long MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT = + 20 * 1024 * 1024; + + public static final String MULTIPART_DOWNLOAD_SIZE_KEY = + "fs.oss.multipart.download.size"; + + public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024; + + // Comma separated list of directories + public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir"; + + // private | public-read | public-read-write + public static final String CANNED_ACL_KEY = "fs.oss.acl.default"; + public static final String CANNED_ACL_DEFAULT = ""; + + // OSS server-side encryption + public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY = + "fs.oss.server-side-encryption-algorithm"; + + public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size"; + public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024; + public static final String FS_OSS = "oss"; + + public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L; + public static final int MAX_RETRIES = 10; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b756beb6/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java new file mode 100644 index 0000000..234567b --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Aliyun OSS Filesystem. + */ +package org.apache.hadoop.fs.aliyun.oss; \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org