Revert "HADOOP-13584. hdoop-aliyun: merge HADOOP-12756 branch back" This reverts commit 5707f88d8550346f167e45c2f8c4161eb3957e3a
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d1443988 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d1443988 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d1443988 Branch: refs/heads/trunk Commit: d1443988f809fe6656f60dfed4ee4e0f4844ee5c Parents: 9a44a83 Author: Kai Zheng <kai.zh...@intel.com> Authored: Thu Sep 29 09:18:27 2016 +0800 Committer: Kai Zheng <kai.zh...@intel.com> Committed: Thu Sep 29 09:18:27 2016 +0800 ---------------------------------------------------------------------- .gitignore | 2 - hadoop-project/pom.xml | 22 - .../dev-support/findbugs-exclude.xml | 18 - hadoop-tools/hadoop-aliyun/pom.xml | 154 ------ .../aliyun/oss/AliyunCredentialsProvider.java | 87 --- .../fs/aliyun/oss/AliyunOSSFileSystem.java | 543 ------------------- .../fs/aliyun/oss/AliyunOSSFileSystemStore.java | 516 ------------------ .../fs/aliyun/oss/AliyunOSSInputStream.java | 260 --------- .../fs/aliyun/oss/AliyunOSSOutputStream.java | 111 ---- .../hadoop/fs/aliyun/oss/AliyunOSSUtils.java | 167 ------ .../apache/hadoop/fs/aliyun/oss/Constants.java | 113 ---- .../hadoop/fs/aliyun/oss/package-info.java | 22 - .../site/markdown/tools/hadoop-aliyun/index.md | 294 ---------- .../fs/aliyun/oss/AliyunOSSTestUtils.java | 77 --- .../fs/aliyun/oss/TestAliyunCredentials.java | 78 --- .../oss/TestAliyunOSSFileSystemContract.java | 239 -------- .../oss/TestAliyunOSSFileSystemStore.java | 125 ----- .../fs/aliyun/oss/TestAliyunOSSInputStream.java | 145 ----- .../aliyun/oss/TestAliyunOSSOutputStream.java | 91 ---- .../aliyun/oss/contract/AliyunOSSContract.java | 49 -- .../contract/TestAliyunOSSContractCreate.java | 35 -- .../contract/TestAliyunOSSContractDelete.java | 34 -- .../contract/TestAliyunOSSContractDistCp.java | 44 -- .../TestAliyunOSSContractGetFileStatus.java | 35 -- .../contract/TestAliyunOSSContractMkdir.java | 34 -- .../oss/contract/TestAliyunOSSContractOpen.java | 34 -- .../contract/TestAliyunOSSContractRename.java | 35 -- .../contract/TestAliyunOSSContractRootDir.java | 69 --- .../oss/contract/TestAliyunOSSContractSeek.java | 34 -- .../src/test/resources/contract/aliyun-oss.xml | 115 ---- .../src/test/resources/core-site.xml | 46 -- .../src/test/resources/log4j.properties | 23 - hadoop-tools/hadoop-tools-dist/pom.xml | 6 - hadoop-tools/pom.xml | 1 - 34 files changed, 3658 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 194862b..a5d69d0 100644 --- a/.gitignore +++ b/.gitignore @@ -31,5 +31,3 @@ hadoop-tools/hadoop-aws/src/test/resources/auth-keys.xml hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml hadoop-tools/hadoop-azure/src/test/resources/azure-auth-keys.xml patchprocess/ -hadoop-tools/hadoop-aliyun/src/test/resources/auth-keys.xml -hadoop-tools/hadoop-aliyun/src/test/resources/contract-test-options.xml http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 49ea40f..d9a01a0 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -439,12 +439,6 @@ <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-aliyun</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-kms</artifactId> <version>${project.version}</version> <classifier>classes</classifier> @@ -1011,22 +1005,6 @@ <version>4.2.0</version> </dependency> - <dependency> - <groupId>com.aliyun.oss</groupId> - <artifactId>aliyun-sdk-oss</artifactId> - <version>2.2.1</version> - <exclusions> - <exclusion> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - </exclusion> - <exclusion> - <groupId>commons-beanutils</groupId> - <artifactId>commons-beanutils</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>xerces</groupId> <artifactId>xercesImpl</artifactId> http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml deleted file mode 100644 index 40d78d0..0000000 --- a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml +++ /dev/null @@ -1,18 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<FindBugsFilter> -</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/pom.xml b/hadoop-tools/hadoop-aliyun/pom.xml deleted file mode 100644 index 358b18b..0000000 --- a/hadoop-tools/hadoop-aliyun/pom.xml +++ /dev/null @@ -1,154 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. See accompanying LICENSE file. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-project</artifactId> - <version>3.0.0-alpha2-SNAPSHOT</version> - <relativePath>../../hadoop-project</relativePath> - </parent> - <artifactId>hadoop-aliyun</artifactId> - <name>Apache Hadoop Aliyun OSS support</name> - <packaging>jar</packaging> - - <properties> - <file.encoding>UTF-8</file.encoding> - <downloadSources>true</downloadSources> - </properties> - - <profiles> - <profile> - <id>tests-off</id> - <activation> - <file> - <missing>src/test/resources/auth-keys.xml</missing> - </file> - </activation> - <properties> - <maven.test.skip>true</maven.test.skip> - </properties> - </profile> - <profile> - <id>tests-on</id> - <activation> - <file> - <exists>src/test/resources/auth-keys.xml</exists> - </file> - </activation> - <properties> - <maven.test.skip>false</maven.test.skip> - </properties> - </profile> - </profiles> - - <build> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - <configuration> - <findbugsXmlOutput>true</findbugsXmlOutput> - <xmlOutput>true</xmlOutput> - <excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml - </excludeFilterFile> - <effort>Max</effort> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-project-info-reports-plugin</artifactId> - <configuration> - <dependencyDetailsEnabled>false</dependencyDetailsEnabled> - <dependencyLocationsEnabled>false</dependencyLocationsEnabled> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <id>deplist</id> - <phase>compile</phase> - <goals> - <goal>list</goal> - </goals> - <configuration> - <!-- build a shellprofile --> - <outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - - <dependencies> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>com.aliyun.oss</groupId> - <artifactId>aliyun-sdk-oss</artifactId> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>test</scope> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-distcp</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-distcp</artifactId> - <scope>test</scope> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-tests</artifactId> - <scope>test</scope> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-jobclient</artifactId> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java deleted file mode 100644 index b46c67a..0000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import com.aliyun.oss.common.auth.Credentials; -import com.aliyun.oss.common.auth.CredentialsProvider; -import com.aliyun.oss.common.auth.DefaultCredentials; -import com.aliyun.oss.common.auth.InvalidCredentialsException; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -/** - * Support session credentials for authenticating with Aliyun. - */ -public class AliyunCredentialsProvider implements CredentialsProvider { - private Credentials credentials = null; - - public AliyunCredentialsProvider(Configuration conf) - throws IOException { - String accessKeyId; - String accessKeySecret; - String securityToken; - try { - accessKeyId = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_ID); - accessKeySecret = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_SECRET); - } catch (IOException e) { - throw new InvalidCredentialsException(e); - } - - try { - securityToken = AliyunOSSUtils.getValueWithKey(conf, SECURITY_TOKEN); - } catch (IOException e) { - securityToken = null; - } - - if (StringUtils.isEmpty(accessKeyId) - || StringUtils.isEmpty(accessKeySecret)) { - throw new InvalidCredentialsException( - "AccessKeyId and AccessKeySecret should not be null or empty."); - } - - if (StringUtils.isNotEmpty(securityToken)) { - credentials = new DefaultCredentials(accessKeyId, accessKeySecret, - securityToken); - } else { - credentials = new DefaultCredentials(accessKeyId, accessKeySecret); - } - } - - @Override - public void setCredentials(Credentials creds) { - if (creds == null) { - throw new InvalidCredentialsException("Credentials should not be null."); - } - - credentials = creds; - } - - @Override - public Credentials getCredentials() { - if (credentials == null) { - throw new InvalidCredentialsException("Invalid credentials"); - } - - return credentials; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java deleted file mode 100644 index 81e038d..0000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ /dev/null @@ -1,543 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Progressable; - -import com.aliyun.oss.model.OSSObjectSummary; -import com.aliyun.oss.model.ObjectListing; -import com.aliyun.oss.model.ObjectMetadata; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -/** - * Implementation of {@link FileSystem} for <a href="https://oss.aliyun.com"> - * Aliyun OSS</a>, used to access OSS blob system in a filesystem style. - */ -public class AliyunOSSFileSystem extends FileSystem { - private static final Logger LOG = - LoggerFactory.getLogger(AliyunOSSFileSystem.class); - private URI uri; - private Path workingDir; - private AliyunOSSFileSystemStore store; - private int maxKeys; - - @Override - public FSDataOutputStream append(Path path, int bufferSize, - Progressable progress) throws IOException { - throw new IOException("Append is not supported!"); - } - - @Override - public void close() throws IOException { - try { - store.close(); - } finally { - super.close(); - } - } - - @Override - public FSDataOutputStream create(Path path, FsPermission permission, - boolean overwrite, int bufferSize, short replication, long blockSize, - Progressable progress) throws IOException { - String key = pathToKey(path); - FileStatus status = null; - - try { - // get the status or throw a FNFE - status = getFileStatus(path); - - // if the thread reaches here, there is something at the path - if (status.isDirectory()) { - // path references a directory - throw new FileAlreadyExistsException(path + " is a directory"); - } - if (!overwrite) { - // path references a file and overwrite is disabled - throw new FileAlreadyExistsException(path + " already exists"); - } - LOG.debug("Overwriting file {}", path); - } catch (FileNotFoundException e) { - // this means the file is not found - } - - return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(), - store, key, progress, statistics), (Statistics)(null)); - } - - @Override - public boolean delete(Path path, boolean recursive) throws IOException { - try { - return innerDelete(getFileStatus(path), recursive); - } catch (FileNotFoundException e) { - LOG.debug("Couldn't delete {} - does not exist", path); - return false; - } - } - - /** - * Delete an object. See {@link #delete(Path, boolean)}. - * - * @param status fileStatus object - * @param recursive if path is a directory and set to - * true, the directory is deleted else throws an exception. In - * case of a file the recursive can be set to either true or false. - * @return true if delete is successful else false. - * @throws IOException due to inability to delete a directory or file. - */ - private boolean innerDelete(FileStatus status, boolean recursive) - throws IOException { - Path f = status.getPath(); - String key = pathToKey(f); - if (status.isDirectory()) { - if (!recursive) { - FileStatus[] statuses = listStatus(status.getPath()); - // Check whether it is an empty directory or not - if (statuses.length > 0) { - throw new IOException("Cannot remove directory " + f + - ": It is not empty!"); - } else { - // Delete empty directory without '-r' - key = AliyunOSSUtils.maybeAddTrailingSlash(key); - store.deleteObject(key); - } - } else { - store.deleteDirs(key); - } - } else { - store.deleteObject(key); - } - - createFakeDirectoryIfNecessary(f); - return true; - } - - private void createFakeDirectoryIfNecessary(Path f) throws IOException { - String key = pathToKey(f); - if (StringUtils.isNotEmpty(key) && !exists(f)) { - LOG.debug("Creating new fake directory at {}", f); - mkdir(pathToKey(f.getParent())); - } - } - - @Override - public FileStatus getFileStatus(Path path) throws IOException { - Path qualifiedPath = path.makeQualified(uri, workingDir); - String key = pathToKey(qualifiedPath); - - // Root always exists - if (key.length() == 0) { - return new FileStatus(0, true, 1, 0, 0, qualifiedPath); - } - - ObjectMetadata meta = store.getObjectMetadata(key); - // If key not found and key does not end with "/" - if (meta == null && !key.endsWith("/")) { - // In case of 'dir + "/"' - key += "/"; - meta = store.getObjectMetadata(key); - } - if (meta == null) { - ObjectListing listing = store.listObjects(key, 1, null, false); - if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) || - CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) { - return new FileStatus(0, true, 1, 0, 0, qualifiedPath); - } else { - throw new FileNotFoundException(path + ": No such file or directory!"); - } - } else if (objectRepresentsDirectory(key, meta.getContentLength())) { - return new FileStatus(0, true, 1, 0, 0, qualifiedPath); - } else { - return new FileStatus(meta.getContentLength(), false, 1, - getDefaultBlockSize(path), meta.getLastModified().getTime(), - qualifiedPath); - } - } - - @Override - public String getScheme() { - return "oss"; - } - - @Override - public URI getUri() { - return uri; - } - - @Override - public Path getWorkingDirectory() { - return workingDir; - } - - @Deprecated - public long getDefaultBlockSize() { - return getConf().getLong(FS_OSS_BLOCK_SIZE_KEY, FS_OSS_BLOCK_SIZE_DEFAULT); - } - - @Override - public String getCanonicalServiceName() { - // Does not support Token - return null; - } - - /** - * Initialize new FileSystem. - * - * @param name the uri of the file system, including host, port, etc. - * @param conf configuration of the file system - * @throws IOException IO problems - */ - public void initialize(URI name, Configuration conf) throws IOException { - super.initialize(name, conf); - - uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority()); - workingDir = new Path("/user", - System.getProperty("user.name")).makeQualified(uri, null); - - store = new AliyunOSSFileSystemStore(); - store.initialize(name, conf, statistics); - maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); - setConf(conf); - } - - /** - * Check if OSS object represents a directory. - * - * @param name object key - * @param size object content length - * @return true if object represents a directory - */ - private boolean objectRepresentsDirectory(final String name, - final long size) { - return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L; - } - - /** - * Turn a path (relative or otherwise) into an OSS key. - * - * @param path the path of the file. - * @return the key of the object that represents the file. - */ - private String pathToKey(Path path) { - if (!path.isAbsolute()) { - path = new Path(workingDir, path); - } - - return path.toUri().getPath().substring(1); - } - - private Path keyToPath(String key) { - return new Path("/" + key); - } - - @Override - public FileStatus[] listStatus(Path path) throws IOException { - String key = pathToKey(path); - if (LOG.isDebugEnabled()) { - LOG.debug("List status for path: " + path); - } - - final List<FileStatus> result = new ArrayList<FileStatus>(); - final FileStatus fileStatus = getFileStatus(path); - - if (fileStatus.isDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("listStatus: doing listObjects for directory " + key); - } - - ObjectListing objects = store.listObjects(key, maxKeys, null, false); - while (true) { - statistics.incrementReadOps(1); - for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { - String objKey = objectSummary.getKey(); - if (objKey.equals(key + "/")) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring: " + objKey); - } - continue; - } else { - Path keyPath = keyToPath(objectSummary.getKey()) - .makeQualified(uri, workingDir); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: fi: " + keyPath); - } - result.add(new FileStatus(objectSummary.getSize(), false, 1, - getDefaultBlockSize(keyPath), - objectSummary.getLastModified().getTime(), keyPath)); - } - } - - for (String prefix : objects.getCommonPrefixes()) { - if (prefix.equals(key + "/")) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring: " + prefix); - } - continue; - } else { - Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: rd: " + keyPath); - } - result.add(new FileStatus(0, true, 1, 0, 0, keyPath)); - } - } - - if (objects.isTruncated()) { - if (LOG.isDebugEnabled()) { - LOG.debug("listStatus: list truncated - getting next batch"); - } - String nextMarker = objects.getNextMarker(); - objects = store.listObjects(key, maxKeys, nextMarker, false); - statistics.incrementReadOps(1); - } else { - break; - } - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: rd (not a dir): " + path); - } - result.add(fileStatus); - } - - return result.toArray(new FileStatus[result.size()]); - } - - /** - * Used to create an empty file that represents an empty directory. - * - * @param key directory path - * @return true if directory is successfully created - * @throws IOException - */ - private boolean mkdir(final String key) throws IOException { - String dirName = key; - if (StringUtils.isNotEmpty(key)) { - if (!key.endsWith("/")) { - dirName += "/"; - } - store.storeEmptyFile(dirName); - } - return true; - } - - @Override - public boolean mkdirs(Path path, FsPermission permission) - throws IOException { - try { - FileStatus fileStatus = getFileStatus(path); - - if (fileStatus.isDirectory()) { - return true; - } else { - throw new FileAlreadyExistsException("Path is a file: " + path); - } - } catch (FileNotFoundException e) { - validatePath(path); - String key = pathToKey(path); - return mkdir(key); - } - } - - /** - * Check whether the path is a valid path. - * - * @param path the path to be checked. - * @throws IOException - */ - private void validatePath(Path path) throws IOException { - Path fPart = path.getParent(); - do { - try { - FileStatus fileStatus = getFileStatus(fPart); - if (fileStatus.isDirectory()) { - // If path exists and a directory, exit - break; - } else { - throw new FileAlreadyExistsException(String.format( - "Can't make directory for path '%s', it is a file.", fPart)); - } - } catch (FileNotFoundException fnfe) { - } - fPart = fPart.getParent(); - } while (fPart != null); - } - - @Override - public FSDataInputStream open(Path path, int bufferSize) throws IOException { - final FileStatus fileStatus = getFileStatus(path); - if (fileStatus.isDirectory()) { - throw new FileNotFoundException("Can't open " + path + - " because it is a directory"); - } - - return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store, - pathToKey(path), fileStatus.getLen(), statistics)); - } - - @Override - public boolean rename(Path srcPath, Path dstPath) throws IOException { - if (srcPath.isRoot()) { - // Cannot rename root of file system - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot rename the root of a filesystem"); - } - return false; - } - Path parent = dstPath.getParent(); - while (parent != null && !srcPath.equals(parent)) { - parent = parent.getParent(); - } - if (parent != null) { - return false; - } - FileStatus srcStatus = getFileStatus(srcPath); - FileStatus dstStatus; - try { - dstStatus = getFileStatus(dstPath); - } catch (FileNotFoundException fnde) { - dstStatus = null; - } - if (dstStatus == null) { - // If dst doesn't exist, check whether dst dir exists or not - dstStatus = getFileStatus(dstPath.getParent()); - if (!dstStatus.isDirectory()) { - throw new IOException(String.format( - "Failed to rename %s to %s, %s is a file", srcPath, dstPath, - dstPath.getParent())); - } - } else { - if (srcStatus.getPath().equals(dstStatus.getPath())) { - return !srcStatus.isDirectory(); - } else if (dstStatus.isDirectory()) { - // If dst is a directory - dstPath = new Path(dstPath, srcPath.getName()); - FileStatus[] statuses; - try { - statuses = listStatus(dstPath); - } catch (FileNotFoundException fnde) { - statuses = null; - } - if (statuses != null && statuses.length > 0) { - // If dst exists and not a directory / not empty - throw new FileAlreadyExistsException(String.format( - "Failed to rename %s to %s, file already exists or not empty!", - srcPath, dstPath)); - } - } else { - // If dst is not a directory - throw new FileAlreadyExistsException(String.format( - "Failed to rename %s to %s, file already exists!", srcPath, - dstPath)); - } - } - if (srcStatus.isDirectory()) { - copyDirectory(srcPath, dstPath); - } else { - copyFile(srcPath, dstPath); - } - - return srcPath.equals(dstPath) || delete(srcPath, true); - } - - /** - * Copy file from source path to destination path. - * (the caller should make sure srcPath is a file and dstPath is valid) - * - * @param srcPath source path. - * @param dstPath destination path. - * @return true if file is successfully copied. - */ - private boolean copyFile(Path srcPath, Path dstPath) { - String srcKey = pathToKey(srcPath); - String dstKey = pathToKey(dstPath); - return store.copyFile(srcKey, dstKey); - } - - /** - * Copy a directory from source path to destination path. - * (the caller should make sure srcPath is a directory, and dstPath is valid) - * - * @param srcPath source path. - * @param dstPath destination path. - * @return true if directory is successfully copied. - */ - private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { - String srcKey = AliyunOSSUtils - .maybeAddTrailingSlash(pathToKey(srcPath)); - String dstKey = AliyunOSSUtils - .maybeAddTrailingSlash(pathToKey(dstPath)); - - if (dstKey.startsWith(srcKey)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot rename a directory to a subdirectory of self"); - } - return false; - } - - store.storeEmptyFile(dstKey); - ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true); - statistics.incrementReadOps(1); - // Copy files from src folder to dst - while (true) { - for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { - String newKey = - dstKey.concat(objectSummary.getKey().substring(srcKey.length())); - store.copyFile(objectSummary.getKey(), newKey); - } - if (objects.isTruncated()) { - String nextMarker = objects.getNextMarker(); - objects = store.listObjects(srcKey, maxKeys, nextMarker, true); - statistics.incrementReadOps(1); - } else { - break; - } - } - return true; - } - - @Override - public void setWorkingDirectory(Path dir) { - this.workingDir = dir; - } - - public AliyunOSSFileSystemStore getStore() { - return store; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java deleted file mode 100644 index 9792a78..0000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java +++ /dev/null @@ -1,516 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.aliyun.oss; - -import com.aliyun.oss.ClientConfiguration; -import com.aliyun.oss.ClientException; -import com.aliyun.oss.OSSClient; -import com.aliyun.oss.OSSException; -import com.aliyun.oss.common.auth.CredentialsProvider; -import com.aliyun.oss.common.comm.Protocol; -import com.aliyun.oss.model.AbortMultipartUploadRequest; -import com.aliyun.oss.model.CannedAccessControlList; -import com.aliyun.oss.model.CompleteMultipartUploadRequest; -import com.aliyun.oss.model.CompleteMultipartUploadResult; -import com.aliyun.oss.model.CopyObjectResult; -import com.aliyun.oss.model.DeleteObjectsRequest; -import com.aliyun.oss.model.GetObjectRequest; -import com.aliyun.oss.model.InitiateMultipartUploadRequest; -import com.aliyun.oss.model.InitiateMultipartUploadResult; -import com.aliyun.oss.model.ListObjectsRequest; -import com.aliyun.oss.model.ObjectMetadata; -import com.aliyun.oss.model.ObjectListing; -import com.aliyun.oss.model.OSSObjectSummary; -import com.aliyun.oss.model.PartETag; -import com.aliyun.oss.model.PutObjectResult; -import com.aliyun.oss.model.UploadPartCopyRequest; -import com.aliyun.oss.model.UploadPartCopyResult; -import com.aliyun.oss.model.UploadPartRequest; -import com.aliyun.oss.model.UploadPartResult; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -/** - * Core implementation of Aliyun OSS Filesystem for Hadoop. - * Provides the bridging logic between Hadoop's abstract filesystem and - * Aliyun OSS. - */ -public class AliyunOSSFileSystemStore { - public static final Logger LOG = - LoggerFactory.getLogger(AliyunOSSFileSystemStore.class); - private FileSystem.Statistics statistics; - private OSSClient ossClient; - private String bucketName; - private long uploadPartSize; - private long multipartThreshold; - private long partSize; - private int maxKeys; - private String serverSideEncryptionAlgorithm; - - public void initialize(URI uri, Configuration conf, - FileSystem.Statistics stat) throws IOException { - statistics = stat; - ClientConfiguration clientConf = new ClientConfiguration(); - clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY, - MAXIMUM_CONNECTIONS_DEFAULT)); - boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY, - SECURE_CONNECTIONS_DEFAULT); - clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); - clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY, - MAX_ERROR_RETRIES_DEFAULT)); - clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY, - ESTABLISH_TIMEOUT_DEFAULT)); - clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY, - SOCKET_TIMEOUT_DEFAULT)); - - String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, ""); - int proxyPort = conf.getInt(PROXY_PORT_KEY, -1); - if (StringUtils.isNotEmpty(proxyHost)) { - clientConf.setProxyHost(proxyHost); - if (proxyPort >= 0) { - clientConf.setProxyPort(proxyPort); - } else { - if (secureConnections) { - LOG.warn("Proxy host set without port. Using HTTPS default 443"); - clientConf.setProxyPort(443); - } else { - LOG.warn("Proxy host set without port. Using HTTP default 80"); - clientConf.setProxyPort(80); - } - } - String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY); - String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY); - if ((proxyUsername == null) != (proxyPassword == null)) { - String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " + - PROXY_PASSWORD_KEY + " set without the other."; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - clientConf.setProxyUsername(proxyUsername); - clientConf.setProxyPassword(proxyPassword); - clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY)); - clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY)); - } else if (proxyPort >= 0) { - String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " + - PROXY_HOST_KEY; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - - String endPoint = conf.getTrimmed(ENDPOINT_KEY, ""); - CredentialsProvider provider = - AliyunOSSUtils.getCredentialsProvider(conf); - ossClient = new OSSClient(endPoint, provider, clientConf); - uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, - MULTIPART_UPLOAD_SIZE_DEFAULT); - multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, - MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); - partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, - MULTIPART_UPLOAD_SIZE_DEFAULT); - if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) { - partSize = MIN_MULTIPART_UPLOAD_PART_SIZE; - } - serverSideEncryptionAlgorithm = - conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, ""); - - if (uploadPartSize < 5 * 1024 * 1024) { - LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB"); - uploadPartSize = 5 * 1024 * 1024; - } - - if (multipartThreshold < 5 * 1024 * 1024) { - LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB"); - multipartThreshold = 5 * 1024 * 1024; - } - - if (multipartThreshold > 1024 * 1024 * 1024) { - LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB"); - multipartThreshold = 1024 * 1024 * 1024; - } - - String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT); - if (StringUtils.isNotEmpty(cannedACLName)) { - CannedAccessControlList cannedACL = - CannedAccessControlList.valueOf(cannedACLName); - ossClient.setBucketAcl(bucketName, cannedACL); - } - - maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); - bucketName = uri.getHost(); - } - - /** - * Delete an object, and update write operation statistics. - * - * @param key key to blob to delete. - */ - public void deleteObject(String key) { - ossClient.deleteObject(bucketName, key); - statistics.incrementWriteOps(1); - } - - /** - * Delete a list of keys, and update write operation statistics. - * - * @param keysToDelete collection of keys to delete. - */ - public void deleteObjects(List<String> keysToDelete) { - if (CollectionUtils.isNotEmpty(keysToDelete)) { - DeleteObjectsRequest deleteRequest = - new DeleteObjectsRequest(bucketName); - deleteRequest.setKeys(keysToDelete); - ossClient.deleteObjects(deleteRequest); - statistics.incrementWriteOps(keysToDelete.size()); - } - } - - /** - * Delete a directory from Aliyun OSS. - * - * @param key directory key to delete. - */ - public void deleteDirs(String key) { - key = AliyunOSSUtils.maybeAddTrailingSlash(key); - ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); - listRequest.setPrefix(key); - listRequest.setDelimiter(null); - listRequest.setMaxKeys(maxKeys); - - while (true) { - ObjectListing objects = ossClient.listObjects(listRequest); - statistics.incrementReadOps(1); - List<String> keysToDelete = new ArrayList<String>(); - for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { - keysToDelete.add(objectSummary.getKey()); - } - deleteObjects(keysToDelete); - if (objects.isTruncated()) { - listRequest.setMarker(objects.getNextMarker()); - } else { - break; - } - } - } - - /** - * Return metadata of a given object key. - * - * @param key object key. - * @return return null if key does not exist. - */ - public ObjectMetadata getObjectMetadata(String key) { - try { - return ossClient.getObjectMetadata(bucketName, key); - } catch (OSSException osse) { - return null; - } finally { - statistics.incrementReadOps(1); - } - } - - /** - * Upload an empty file as an OSS object, using single upload. - * - * @param key object key. - * @throws IOException if failed to upload object. - */ - public void storeEmptyFile(String key) throws IOException { - ObjectMetadata dirMeta = new ObjectMetadata(); - byte[] buffer = new byte[0]; - ByteArrayInputStream in = new ByteArrayInputStream(buffer); - dirMeta.setContentLength(0); - try { - ossClient.putObject(bucketName, key, in, dirMeta); - } finally { - in.close(); - } - } - - /** - * Copy an object from source key to destination key. - * - * @param srcKey source key. - * @param dstKey destination key. - * @return true if file is successfully copied. - */ - public boolean copyFile(String srcKey, String dstKey) { - ObjectMetadata objectMeta = - ossClient.getObjectMetadata(bucketName, srcKey); - long contentLength = objectMeta.getContentLength(); - if (contentLength <= multipartThreshold) { - return singleCopy(srcKey, dstKey); - } else { - return multipartCopy(srcKey, contentLength, dstKey); - } - } - - /** - * Use single copy to copy an OSS object. - * (The caller should make sure srcPath is a file and dstPath is valid) - * - * @param srcKey source key. - * @param dstKey destination key. - * @return true if object is successfully copied. - */ - private boolean singleCopy(String srcKey, String dstKey) { - CopyObjectResult copyResult = - ossClient.copyObject(bucketName, srcKey, bucketName, dstKey); - LOG.debug(copyResult.getETag()); - return true; - } - - /** - * Use multipart copy to copy an OSS object. - * (The caller should make sure srcPath is a file and dstPath is valid) - * - * @param srcKey source key. - * @param contentLength data size of the object to copy. - * @param dstKey destination key. - * @return true if success, or false if upload is aborted. - */ - private boolean multipartCopy(String srcKey, long contentLength, - String dstKey) { - long realPartSize = - AliyunOSSUtils.calculatePartSize(contentLength, uploadPartSize); - int partNum = (int) (contentLength / realPartSize); - if (contentLength % realPartSize != 0) { - partNum++; - } - InitiateMultipartUploadRequest initiateMultipartUploadRequest = - new InitiateMultipartUploadRequest(bucketName, dstKey); - ObjectMetadata meta = new ObjectMetadata(); - if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { - meta.setServerSideEncryption(serverSideEncryptionAlgorithm); - } - initiateMultipartUploadRequest.setObjectMetadata(meta); - InitiateMultipartUploadResult initiateMultipartUploadResult = - ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); - String uploadId = initiateMultipartUploadResult.getUploadId(); - List<PartETag> partETags = new ArrayList<PartETag>(); - try { - for (int i = 0; i < partNum; i++) { - long skipBytes = realPartSize * i; - long size = (realPartSize < contentLength - skipBytes) ? - realPartSize : contentLength - skipBytes; - UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest(); - partCopyRequest.setSourceBucketName(bucketName); - partCopyRequest.setSourceKey(srcKey); - partCopyRequest.setBucketName(bucketName); - partCopyRequest.setKey(dstKey); - partCopyRequest.setUploadId(uploadId); - partCopyRequest.setPartSize(size); - partCopyRequest.setBeginIndex(skipBytes); - partCopyRequest.setPartNumber(i + 1); - UploadPartCopyResult partCopyResult = - ossClient.uploadPartCopy(partCopyRequest); - statistics.incrementWriteOps(1); - partETags.add(partCopyResult.getPartETag()); - } - CompleteMultipartUploadRequest completeMultipartUploadRequest = - new CompleteMultipartUploadRequest(bucketName, dstKey, - uploadId, partETags); - CompleteMultipartUploadResult completeMultipartUploadResult = - ossClient.completeMultipartUpload(completeMultipartUploadRequest); - LOG.debug(completeMultipartUploadResult.getETag()); - return true; - } catch (OSSException | ClientException e) { - AbortMultipartUploadRequest abortMultipartUploadRequest = - new AbortMultipartUploadRequest(bucketName, dstKey, uploadId); - ossClient.abortMultipartUpload(abortMultipartUploadRequest); - return false; - } - } - - /** - * Upload a file as an OSS object, using single upload. - * - * @param key object key. - * @param file local file to upload. - * @throws IOException if failed to upload object. - */ - public void uploadObject(String key, File file) throws IOException { - File object = file.getAbsoluteFile(); - FileInputStream fis = new FileInputStream(object); - ObjectMetadata meta = new ObjectMetadata(); - meta.setContentLength(object.length()); - if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { - meta.setServerSideEncryption(serverSideEncryptionAlgorithm); - } - try { - PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta); - LOG.debug(result.getETag()); - statistics.incrementWriteOps(1); - } finally { - fis.close(); - } - } - - /** - * Upload a file as an OSS object, using multipart upload. - * - * @param key object key. - * @param file local file to upload. - * @throws IOException if failed to upload object. - */ - public void multipartUploadObject(String key, File file) throws IOException { - File object = file.getAbsoluteFile(); - long dataLen = object.length(); - long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize); - int partNum = (int) (dataLen / realPartSize); - if (dataLen % realPartSize != 0) { - partNum += 1; - } - - InitiateMultipartUploadRequest initiateMultipartUploadRequest = - new InitiateMultipartUploadRequest(bucketName, key); - ObjectMetadata meta = new ObjectMetadata(); - if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { - meta.setServerSideEncryption(serverSideEncryptionAlgorithm); - } - initiateMultipartUploadRequest.setObjectMetadata(meta); - InitiateMultipartUploadResult initiateMultipartUploadResult = - ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); - List<PartETag> partETags = new ArrayList<PartETag>(); - String uploadId = initiateMultipartUploadResult.getUploadId(); - - try { - for (int i = 0; i < partNum; i++) { - // TODO: Optimize this, avoid opening the object multiple times - FileInputStream fis = new FileInputStream(object); - try { - long skipBytes = realPartSize * i; - AliyunOSSUtils.skipFully(fis, skipBytes); - long size = (realPartSize < dataLen - skipBytes) ? - realPartSize : dataLen - skipBytes; - UploadPartRequest uploadPartRequest = new UploadPartRequest(); - uploadPartRequest.setBucketName(bucketName); - uploadPartRequest.setKey(key); - uploadPartRequest.setUploadId(uploadId); - uploadPartRequest.setInputStream(fis); - uploadPartRequest.setPartSize(size); - uploadPartRequest.setPartNumber(i + 1); - UploadPartResult uploadPartResult = - ossClient.uploadPart(uploadPartRequest); - statistics.incrementWriteOps(1); - partETags.add(uploadPartResult.getPartETag()); - } finally { - fis.close(); - } - } - CompleteMultipartUploadRequest completeMultipartUploadRequest = - new CompleteMultipartUploadRequest(bucketName, key, - uploadId, partETags); - CompleteMultipartUploadResult completeMultipartUploadResult = - ossClient.completeMultipartUpload(completeMultipartUploadRequest); - LOG.debug(completeMultipartUploadResult.getETag()); - } catch (OSSException | ClientException e) { - AbortMultipartUploadRequest abortMultipartUploadRequest = - new AbortMultipartUploadRequest(bucketName, key, uploadId); - ossClient.abortMultipartUpload(abortMultipartUploadRequest); - } - } - - /** - * list objects. - * - * @param prefix prefix. - * @param maxListingLength max no. of entries - * @param marker last key in any previous search. - * @param recursive whether to list directory recursively. - * @return a list of matches. - */ - public ObjectListing listObjects(String prefix, int maxListingLength, - String marker, boolean recursive) { - String delimiter = recursive ? null : "/"; - prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix); - ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); - listRequest.setPrefix(prefix); - listRequest.setDelimiter(delimiter); - listRequest.setMaxKeys(maxListingLength); - listRequest.setMarker(marker); - - ObjectListing listing = ossClient.listObjects(listRequest); - statistics.incrementReadOps(1); - return listing; - } - - /** - * Retrieve a part of an object. - * - * @param key the object name that is being retrieved from the Aliyun OSS. - * @param byteStart start position. - * @param byteEnd end position. - * @return This method returns null if the key is not found. - */ - public InputStream retrieve(String key, long byteStart, long byteEnd) { - try { - GetObjectRequest request = new GetObjectRequest(bucketName, key); - request.setRange(byteStart, byteEnd); - return ossClient.getObject(request).getObjectContent(); - } catch (OSSException | ClientException e) { - return null; - } - } - - /** - * Close OSS client properly. - */ - public void close() { - if (ossClient != null) { - ossClient.shutdown(); - ossClient = null; - } - } - - /** - * Clean up all objects matching the prefix. - * - * @param prefix Aliyun OSS object prefix. - */ - public void purge(String prefix) { - String key; - try { - ObjectListing objects = listObjects(prefix, maxKeys, null, true); - for (OSSObjectSummary object : objects.getObjectSummaries()) { - key = object.getKey(); - ossClient.deleteObject(bucketName, key); - } - - for (String dir: objects.getCommonPrefixes()) { - deleteDirs(dir); - } - } catch (OSSException | ClientException e) { - LOG.error("Failed to purge " + prefix); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java deleted file mode 100644 index b87a3a7..0000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FileSystem.Statistics; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -/** - * The input stream for OSS blob system. - * The class uses multi-part downloading to read data from the object content - * stream. - */ -public class AliyunOSSInputStream extends FSInputStream { - public static final Log LOG = LogFactory.getLog(AliyunOSSInputStream.class); - private final long downloadPartSize; - private AliyunOSSFileSystemStore store; - private final String key; - private Statistics statistics; - private boolean closed; - private InputStream wrappedStream = null; - private long contentLength; - private long position; - private long partRemaining; - - public AliyunOSSInputStream(Configuration conf, - AliyunOSSFileSystemStore store, String key, Long contentLength, - Statistics statistics) throws IOException { - this.store = store; - this.key = key; - this.statistics = statistics; - this.contentLength = contentLength; - downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY, - MULTIPART_DOWNLOAD_SIZE_DEFAULT); - reopen(0); - closed = false; - } - - /** - * Reopen the wrapped stream at give position, by seeking for - * data of a part length from object content stream. - * - * @param pos position from start of a file - * @throws IOException if failed to reopen - */ - private synchronized void reopen(long pos) throws IOException { - long partSize; - - if (pos < 0) { - throw new EOFException("Cannot seek at negative position:" + pos); - } else if (pos > contentLength) { - throw new EOFException("Cannot seek after EOF, contentLength:" + - contentLength + " position:" + pos); - } else if (pos + downloadPartSize > contentLength) { - partSize = contentLength - pos; - } else { - partSize = downloadPartSize; - } - - if (wrappedStream != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Aborting old stream to open at pos " + pos); - } - wrappedStream.close(); - } - - wrappedStream = store.retrieve(key, pos, pos + partSize -1); - if (wrappedStream == null) { - throw new IOException("Null IO stream"); - } - position = pos; - partRemaining = partSize; - } - - @Override - public synchronized int read() throws IOException { - checkNotClosed(); - - if (partRemaining <= 0 && position < contentLength) { - reopen(position); - } - - int tries = MAX_RETRIES; - boolean retry; - int byteRead = -1; - do { - retry = false; - try { - byteRead = wrappedStream.read(); - } catch (Exception e) { - handleReadException(e, --tries); - retry = true; - } - } while (retry); - if (byteRead >= 0) { - position++; - partRemaining--; - } - - if (statistics != null && byteRead >= 0) { - statistics.incrementBytesRead(1); - } - return byteRead; - } - - - /** - * Verify that the input stream is open. Non blocking; this gives - * the last state of the volatile {@link #closed} field. - * - * @throws IOException if the connection is closed. - */ - private void checkNotClosed() throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - } - - @Override - public synchronized int read(byte[] buf, int off, int len) - throws IOException { - checkNotClosed(); - - if (buf == null) { - throw new NullPointerException(); - } else if (off < 0 || len < 0 || len > buf.length - off) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return 0; - } - - int bytesRead = 0; - // Not EOF, and read not done - while (position < contentLength && bytesRead < len) { - if (partRemaining == 0) { - reopen(position); - } - - int tries = MAX_RETRIES; - boolean retry; - int bytes = -1; - do { - retry = false; - try { - bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead); - } catch (Exception e) { - handleReadException(e, --tries); - retry = true; - } - } while (retry); - - if (bytes > 0) { - bytesRead += bytes; - position += bytes; - partRemaining -= bytes; - } else if (partRemaining != 0) { - throw new IOException("Failed to read from stream. Remaining:" + - partRemaining); - } - } - - if (statistics != null && bytesRead > 0) { - statistics.incrementBytesRead(bytesRead); - } - - // Read nothing, but attempt to read something - if (bytesRead == 0 && len > 0) { - return -1; - } else { - return bytesRead; - } - } - - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - closed = true; - if (wrappedStream != null) { - wrappedStream.close(); - } - } - - @Override - public synchronized int available() throws IOException { - checkNotClosed(); - - long remaining = contentLength - position; - if (remaining > Integer.MAX_VALUE) { - return Integer.MAX_VALUE; - } - return (int)remaining; - } - - @Override - public synchronized void seek(long pos) throws IOException { - checkNotClosed(); - if (position == pos) { - return; - } else if (pos > position && pos < position + partRemaining) { - AliyunOSSUtils.skipFully(wrappedStream, pos - position); - position = pos; - } else { - reopen(pos); - } - } - - @Override - public synchronized long getPos() throws IOException { - checkNotClosed(); - return position; - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - checkNotClosed(); - return false; - } - - private void handleReadException(Exception e, int tries) throws IOException{ - if (tries == 0) { - throw new IOException(e); - } - - LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" + - " connection at position '" + position + "', " + e.getMessage()); - try { - Thread.sleep(100); - } catch (InterruptedException e2) { - LOG.warn(e2.getMessage()); - } - reopen(position); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java deleted file mode 100644 index c75ee18..0000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem.Statistics; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.util.Progressable; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -/** - * The output stream for OSS blob system. - * Data will be buffered on local disk, then uploaded to OSS in - * {@link #close()} method. - */ -public class AliyunOSSOutputStream extends OutputStream { - public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class); - private AliyunOSSFileSystemStore store; - private final String key; - private Statistics statistics; - private Progressable progress; - private long partSizeThreshold; - private LocalDirAllocator dirAlloc; - private boolean closed; - private File tmpFile; - private BufferedOutputStream backupStream; - - public AliyunOSSOutputStream(Configuration conf, - AliyunOSSFileSystemStore store, String key, Progressable progress, - Statistics statistics) throws IOException { - this.store = store; - this.key = key; - // The caller cann't get any progress information - this.progress = progress; - this.statistics = statistics; - partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, - MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); - - if (conf.get(BUFFER_DIR_KEY) == null) { - conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss"); - } - dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY); - - tmpFile = dirAlloc.createTmpFileForWrite("output-", - LocalDirAllocator.SIZE_UNKNOWN, conf); - backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile)); - closed = false; - } - - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - closed = true; - if (backupStream != null) { - backupStream.close(); - } - long dataLen = tmpFile.length(); - try { - if (dataLen <= partSizeThreshold) { - store.uploadObject(key, tmpFile); - } else { - store.multipartUploadObject(key, tmpFile); - } - } finally { - if (!tmpFile.delete()) { - LOG.warn("Can not delete file: " + tmpFile); - } - } - } - - - - @Override - public synchronized void flush() throws IOException { - backupStream.flush(); - } - - @Override - public synchronized void write(int b) throws IOException { - backupStream.write(b); - statistics.incrementBytesWritten(1); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java deleted file mode 100644 index cae9749..0000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import java.io.IOException; -import java.io.InputStream; - -import com.aliyun.oss.common.auth.CredentialsProvider; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.ProviderUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -/** - * Utility methods for Aliyun OSS code. - */ -final public class AliyunOSSUtils { - private static final Logger LOG = - LoggerFactory.getLogger(AliyunOSSUtils.class); - - private AliyunOSSUtils() { - } - - /** - * Used to get password from configuration. - * - * @param conf configuration that contains password information - * @param key the key of the password - * @return the value for the key - * @throws IOException if failed to get password from configuration - */ - public static String getValueWithKey(Configuration conf, String key) - throws IOException { - try { - final char[] pass = conf.getPassword(key); - if (pass != null) { - return (new String(pass)).trim(); - } else { - return ""; - } - } catch (IOException ioe) { - throw new IOException("Cannot find password option " + key, ioe); - } - } - - /** - * Skip the requested number of bytes or fail if there are no enough bytes - * left. This allows for the possibility that {@link InputStream#skip(long)} - * may not skip as many bytes as requested (most likely because of reaching - * EOF). - * - * @param is the input stream to skip. - * @param n the number of bytes to skip. - * @throws IOException thrown when skipped less number of bytes. - */ - public static void skipFully(InputStream is, long n) throws IOException { - long total = 0; - long cur = 0; - - do { - cur = is.skip(n - total); - total += cur; - } while((total < n) && (cur > 0)); - - if (total < n) { - throw new IOException("Failed to skip " + n + " bytes, possibly due " + - "to EOF."); - } - } - - /** - * Calculate a proper size of multipart piece. If <code>minPartSize</code> - * is too small, the number of multipart pieces may exceed the limit of - * {@link Constants#MULTIPART_UPLOAD_PART_NUM_LIMIT}. - * - * @param contentLength the size of file. - * @param minPartSize the minimum size of multipart piece. - * @return a revisional size of multipart piece. - */ - public static long calculatePartSize(long contentLength, long minPartSize) { - long tmpPartSize = contentLength / MULTIPART_UPLOAD_PART_NUM_LIMIT + 1; - return Math.max(minPartSize, tmpPartSize); - } - - /** - * Create credential provider specified by configuration, or create default - * credential provider if not specified. - * - * @param conf configuration - * @return a credential provider - * @throws IOException on any problem. Class construction issues may be - * nested inside the IOE. - */ - public static CredentialsProvider getCredentialsProvider(Configuration conf) - throws IOException { - CredentialsProvider credentials; - - String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY); - if (StringUtils.isEmpty(className)) { - Configuration newConf = - ProviderUtils.excludeIncompatibleCredentialProviders(conf, - AliyunOSSFileSystem.class); - credentials = new AliyunCredentialsProvider(newConf); - } else { - try { - LOG.debug("Credential provider class is:" + className); - Class<?> credClass = Class.forName(className); - try { - credentials = - (CredentialsProvider)credClass.getDeclaredConstructor( - Configuration.class).newInstance(conf); - } catch (NoSuchMethodException | SecurityException e) { - credentials = - (CredentialsProvider)credClass.getDeclaredConstructor() - .newInstance(); - } - } catch (ClassNotFoundException e) { - throw new IOException(className + " not found.", e); - } catch (NoSuchMethodException | SecurityException e) { - throw new IOException(String.format("%s constructor exception. A " + - "class specified in %s must provide an accessible constructor " + - "accepting URI and Configuration, or an accessible default " + - "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY), - e); - } catch (ReflectiveOperationException | IllegalArgumentException e) { - throw new IOException(className + " instantiation exception.", e); - } - } - - return credentials; - } - - /** - * Turns a path (relative or otherwise) into an OSS key, adding a trailing - * "/" if the path is not the root <i>and</i> does not already have a "/" - * at the end. - * - * @param key OSS key or "" - * @return the with a trailing "/", or, if it is the root key, "". - */ - public static String maybeAddTrailingSlash(String key) { - if (StringUtils.isNotEmpty(key) && !key.endsWith("/")) { - return key + '/'; - } else { - return key; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java deleted file mode 100644 index 04a2ccd..0000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -/** - * ALL configuration constants for OSS filesystem. - */ -public final class Constants { - - private Constants() { - } - - // Class of credential provider - public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY = - "fs.oss.credentials.provider"; - - // OSS access verification - public static final String ACCESS_KEY_ID = "fs.oss.accessKeyId"; - public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret"; - public static final String SECURITY_TOKEN = "fs.oss.securityToken"; - - // Number of simultaneous connections to oss - public static final String MAXIMUM_CONNECTIONS_KEY = - "fs.oss.connection.maximum"; - public static final int MAXIMUM_CONNECTIONS_DEFAULT = 32; - - // Connect to oss over ssl - public static final String SECURE_CONNECTIONS_KEY = - "fs.oss.connection.secure.enabled"; - public static final boolean SECURE_CONNECTIONS_DEFAULT = true; - - // Use a custom endpoint - public static final String ENDPOINT_KEY = "fs.oss.endpoint"; - - // Connect to oss through a proxy server - public static final String PROXY_HOST_KEY = "fs.oss.proxy.host"; - public static final String PROXY_PORT_KEY = "fs.oss.proxy.port"; - public static final String PROXY_USERNAME_KEY = "fs.oss.proxy.username"; - public static final String PROXY_PASSWORD_KEY = "fs.oss.proxy.password"; - public static final String PROXY_DOMAIN_KEY = "fs.oss.proxy.domain"; - public static final String PROXY_WORKSTATION_KEY = - "fs.oss.proxy.workstation"; - - // Number of times we should retry errors - public static final String MAX_ERROR_RETRIES_KEY = "fs.oss.attempts.maximum"; - public static final int MAX_ERROR_RETRIES_DEFAULT = 20; - - // Time until we give up trying to establish a connection to oss - public static final String ESTABLISH_TIMEOUT_KEY = - "fs.oss.connection.establish.timeout"; - public static final int ESTABLISH_TIMEOUT_DEFAULT = 50000; - - // Time until we give up on a connection to oss - public static final String SOCKET_TIMEOUT_KEY = "fs.oss.connection.timeout"; - public static final int SOCKET_TIMEOUT_DEFAULT = 200000; - - // Number of records to get while paging through a directory listing - public static final String MAX_PAGING_KEYS_KEY = "fs.oss.paging.maximum"; - public static final int MAX_PAGING_KEYS_DEFAULT = 1000; - - // Size of each of or multipart pieces in bytes - public static final String MULTIPART_UPLOAD_SIZE_KEY = - "fs.oss.multipart.upload.size"; - - public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024; - public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000; - - // Minimum size in bytes before we start a multipart uploads or copy - public static final String MIN_MULTIPART_UPLOAD_THRESHOLD_KEY = - "fs.oss.multipart.upload.threshold"; - public static final long MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT = - 20 * 1024 * 1024; - - public static final String MULTIPART_DOWNLOAD_SIZE_KEY = - "fs.oss.multipart.download.size"; - - public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024; - - // Comma separated list of directories - public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir"; - - // private | public-read | public-read-write - public static final String CANNED_ACL_KEY = "fs.oss.acl.default"; - public static final String CANNED_ACL_DEFAULT = ""; - - // OSS server-side encryption - public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY = - "fs.oss.server-side-encryption-algorithm"; - - public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size"; - public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024; - public static final String FS_OSS = "oss"; - - public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L; - public static final int MAX_RETRIES = 10; - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java deleted file mode 100644 index 234567b..0000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Aliyun OSS Filesystem. - */ -package org.apache.hadoop.fs.aliyun.oss; \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org