http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java new file mode 100644 index 0000000..c17a5c1 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -0,0 +1,402 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.UnsupportedEncodingException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; +import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * AbfsClient + */ +public class AbfsClient { + public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); + private final URL baseUrl; + private final SharedKeyCredentials sharedKeyCredentials; + private final String xMsVersion = "2018-03-28"; + private final ExponentialRetryPolicy retryPolicy; + private final String filesystem; + private final ConfigurationService configurationService; + private final String userAgent; + + public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, + final ConfigurationService configurationService, + final ExponentialRetryPolicy exponentialRetryPolicy) { + this.baseUrl = baseUrl; + this.sharedKeyCredentials = sharedKeyCredentials; + String baseUrlString = baseUrl.toString(); + this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(AbfsHttpConstants.FORWARD_SLASH) + 1); + this.configurationService = configurationService; + this.retryPolicy = exponentialRetryPolicy; + this.userAgent = initializeUserAgent(); + } + + public String getFileSystem() { + return filesystem; + } + + ExponentialRetryPolicy getRetryPolicy() { + return retryPolicy; + } + + SharedKeyCredentials getSharedKeyCredentials() { + return sharedKeyCredentials; + } + + List<AbfsHttpHeader> createDefaultHeaders() { + final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>(); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_VERSION, xMsVersion)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.ACCEPT, AbfsHttpConstants.APPLICATION_JSON + + AbfsHttpConstants.COMMA + AbfsHttpConstants.SINGLE_WHITE_SPACE + AbfsHttpConstants.APPLICATION_OCTET_STREAM)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.ACCEPT_CHARSET, + AbfsHttpConstants.UTF_8)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.CONTENT_TYPE, AbfsHttpConstants.EMPTY_STRING)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.USER_AGENT, userAgent)); + return requestHeaders; + } + + AbfsUriQueryBuilder createDefaultUriQueryBuilder() { + final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_TIMEOUT, AbfsHttpConstants.DEFAULT_TIMEOUT); + return abfsUriQueryBuilder; + } + + public AbfsRestOperation createFilesystem() throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + this, + AbfsHttpConstants.HTTP_METHOD_PUT, + url, + requestHeaders); + op.execute(); + return op; + } + + public AbfsRestOperation setFilesystemProperties(final String properties) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE, + AbfsHttpConstants.HTTP_METHOD_PATCH)); + + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPERTIES, + properties)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + this, + AbfsHttpConstants.HTTP_METHOD_PUT, + url, + requestHeaders); + op.execute(); + return op; + } + + public AbfsRestOperation listPath(final String relativePath, final boolean recursive, final int listMaxResults, + final String continuation) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_DIRECTORY, relativePath == null ? "" : urlEncode(relativePath)); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults)); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + this, + AbfsHttpConstants.HTTP_METHOD_GET, + url, + requestHeaders); + op.execute(); + return op; + } + + public AbfsRestOperation getFilesystemProperties() throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + this, + AbfsHttpConstants.HTTP_METHOD_HEAD, + url, + requestHeaders); + op.execute(); + return op; + } + + public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + this, + AbfsHttpConstants.HTTP_METHOD_DELETE, + url, + requestHeaders); + op.execute(); + return op; + } + + public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite) + throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + if (!overwrite) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_NONE_MATCH, "*")); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, isFile ? AbfsHttpConstants.FILE : AbfsHttpConstants.DIRECTORY); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + this, + AbfsHttpConstants.HTTP_METHOD_PUT, + url, + requestHeaders); + op.execute(); + return op; + } + + public AbfsRestOperation renamePath(final String source, final String destination, final String continuation) + throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final String encodedRenameSource = urlEncode(AbfsHttpConstants.FORWARD_SLASH + this.getFileSystem() + source); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_RENAME_SOURCE, encodedRenameSource)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_NONE_MATCH, AbfsHttpConstants.STAR)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation); + + final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + this, + AbfsHttpConstants.HTTP_METHOD_PUT, + url, + requestHeaders); + op.execute(); + return op; + } + + public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset, + final int length) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE, + AbfsHttpConstants.HTTP_METHOD_PATCH)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.APPEND_ACTION); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_POSITION, Long.toString(position)); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + this, + AbfsHttpConstants.HTTP_METHOD_PUT, + url, + requestHeaders, buffer, offset, length); + op.execute(); + return op; + } + + + public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE, + AbfsHttpConstants.HTTP_METHOD_PATCH)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.FLUSH_ACTION); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_POSITION, Long.toString(position)); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData)); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + this, + AbfsHttpConstants.HTTP_METHOD_PUT, + url, + requestHeaders); + op.execute(); + return op; + } + + public AbfsRestOperation setPathProperties(final String path, final String properties) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE, + AbfsHttpConstants.HTTP_METHOD_PATCH)); + + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPERTIES, properties)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_PROPERTIES_ACTION); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + this, + AbfsHttpConstants.HTTP_METHOD_PUT, + url, + requestHeaders); + op.execute(); + return op; + } + + public AbfsRestOperation getPathProperties(final String path) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + this, + AbfsHttpConstants.HTTP_METHOD_HEAD, + url, + requestHeaders); + op.execute(); + return op; + } + + public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset, + final int bufferLength, final String eTag) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.RANGE, + String.format("bytes=%d-%d", position, position + bufferLength - 1))); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + + final AbfsRestOperation op = new AbfsRestOperation( + this, + AbfsHttpConstants.HTTP_METHOD_GET, + url, + requestHeaders, + buffer, + bufferOffset, + bufferLength); + op.execute(); + + return op; + } + + public AbfsRestOperation deletePath(final String path, final boolean recursive, final String continuation) + throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + this, + AbfsHttpConstants.HTTP_METHOD_DELETE, + url, + requestHeaders); + op.execute(); + return op; + } + + private URL createRequestUrl(final String query) throws AzureBlobFileSystemException { + return createRequestUrl(AbfsHttpConstants.EMPTY_STRING, query); + } + + private URL createRequestUrl(final String path, final String query) + throws AzureBlobFileSystemException { + final String base = baseUrl.toString(); + String encodedPath = path; + try { + encodedPath = urlEncode(path); + } catch (AzureBlobFileSystemException ex) { + this.LOG.debug( + "Unexpected error.", ex); + } + + final StringBuilder sb = new StringBuilder(); + sb.append(base); + sb.append(encodedPath); + sb.append(query); + + final URL url; + try { + url = new URL(sb.toString()); + } catch (MalformedURLException ex) { + throw new InvalidUriException(sb.toString()); + } + return url; + } + + private static String urlEncode(final String value) throws AzureBlobFileSystemException { + String encodedString = null; + try { + encodedString = URLEncoder.encode(value, AbfsHttpConstants.UTF_8) + .replace(AbfsHttpConstants.PLUS, AbfsHttpConstants.PLUS_ENCODE) + .replace(AbfsHttpConstants.FORWARD_SLASH_ENCODE, AbfsHttpConstants.FORWARD_SLASH); + } catch (UnsupportedEncodingException ex) { + throw new InvalidUriException(value); + } + + return encodedString; + } + + private String initializeUserAgent() { + final String userAgentComment = String.format(Locale.ROOT, + "(JavaJRE %s; %s %s)", + System.getProperty(AbfsHttpConstants.JAVA_VERSION), + System.getProperty(AbfsHttpConstants.OS_NAME) + .replaceAll(AbfsHttpConstants.SINGLE_WHITE_SPACE, AbfsHttpConstants.EMPTY_STRING), + System.getProperty(AbfsHttpConstants.OS_VERSION)); + + return String.format(AbfsHttpConstants.CLIENT_VERSION + " %s", userAgentComment); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java new file mode 100644 index 0000000..9e4c27b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory; +import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; +import org.apache.http.client.utils.URIBuilder; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; + +@Singleton +@InterfaceAudience.Private +@InterfaceStability.Evolving +class AbfsHttpClientFactoryImpl implements AbfsHttpClientFactory { + private final ConfigurationService configurationService; + + @Inject + AbfsHttpClientFactoryImpl( + final ConfigurationService configurationService) { + + Preconditions.checkNotNull(configurationService, "configurationService"); + + this.configurationService = configurationService; + } + + @VisibleForTesting + URIBuilder getURIBuilder(final String hostName, final FileSystem fs) { + final AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs; + + String scheme = FileSystemUriSchemes.HTTP_SCHEME; + + if (abfs.isSecure()) { + scheme = FileSystemUriSchemes.HTTPS_SCHEME; + } + + final URIBuilder uriBuilder = new URIBuilder(); + uriBuilder.setScheme(scheme); + uriBuilder.setHost(hostName); + + return uriBuilder; + } + + public AbfsClient create(final AzureBlobFileSystem fs) throws AzureBlobFileSystemException { + final URI uri = fs.getUri(); + final String authority = uri.getRawAuthority(); + if (null == authority) { + throw new InvalidUriAuthorityException(uri.toString()); + } + + if (!authority.contains(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER)) { + throw new InvalidUriAuthorityException(uri.toString()); + } + + final String[] authorityParts = authority.split(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2); + + if (authorityParts.length < 2 || "".equals(authorityParts[0])) { + final String errMsg = String + .format("URI '%s' has a malformed authority, expected container name. " + + "Authority takes the form "+ FileSystemUriSchemes.ABFS_SCHEME + "://[<container name>@]<account name>", + uri.toString()); + throw new InvalidUriException(errMsg); + } + + final String fileSystemName = authorityParts[0]; + final String accountName = authorityParts[1]; + + final URIBuilder uriBuilder = getURIBuilder(accountName, fs); + + final String url = uriBuilder.toString() + AbfsHttpConstants.FORWARD_SLASH + fileSystemName; + + URL baseUrl; + try { + baseUrl = new URL(url); + } catch (MalformedURLException e) { + throw new InvalidUriException(String.format("URI '%s' is malformed", uri.toString())); + } + + SharedKeyCredentials creds = + new SharedKeyCredentials(accountName.substring(0, accountName.indexOf(AbfsHttpConstants.DOT)), + this.configurationService.getStorageAccountKey(accountName)); + + return new AbfsClient(baseUrl, creds, configurationService, new ExponentialRetryPolicy()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java new file mode 100644 index 0000000..46b4c6d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * The Http Request / Response Headers for Rest AbfsClient + */ +public class AbfsHttpHeader { + private final String name; + private final String value; + + public AbfsHttpHeader(final String name, final String value) { + this.name = name; + this.value = value; + } + + public String getName() { + return name; + } + + public String getValue() { + return value; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java new file mode 100644 index 0000000..0ea9365 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -0,0 +1,430 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.List; +import java.util.UUID; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents an HTTP operation. + */ +public class AbfsHttpOperation { + private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class); + + private static final int CONNECT_TIMEOUT = 30 * 1000; + private static final int READ_TIMEOUT = 30 * 1000; + + private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024; + + private static final int ONE_THOUSAND = 1000; + private static final int ONE_MILLION = ONE_THOUSAND * ONE_THOUSAND; + + private final String method; + private final URL url; + + private HttpURLConnection connection; + private int statusCode; + private String statusDescription; + private String storageErrorCode = ""; + private String storageErrorMessage = ""; + private String clientRequestId = ""; + private String requestId = ""; + private ListResultSchema listResultSchema = null; + + // metrics + private int bytesSent; + private long bytesReceived; + + // optional trace enabled metrics + private final boolean isTraceEnabled; + private long connectionTimeMs; + private long sendRequestTimeMs; + private long recvResponseTimeMs; + + protected HttpURLConnection getConnection() { + return connection; + } + + public String getMethod() { + return method; + } + + public URL getUrl() { + return url; + } + + public int getStatusCode() { + return statusCode; + } + + public String getStatusDescription() { + return statusDescription; + } + + public String getStorageErrorCode() { + return storageErrorCode; + } + + public String getStorageErrorMessage() { + return storageErrorMessage; + } + + public String getClientRequestId() { + return clientRequestId; + } + + public String getRequestId() { + return requestId; + } + + public int getBytesSent() { + return bytesSent; + } + + public long getBytesReceived() { + return bytesReceived; + } + + public ListResultSchema getListResultSchema() { + return listResultSchema; + } + + public String getResponseHeader(String httpHeader) { + return connection.getHeaderField(httpHeader); + } + + // Returns a trace message for the request + @Override + public String toString() { + final String urlStr = url.toString(); + final StringBuilder sb = new StringBuilder(); + sb.append(statusCode); + sb.append(","); + sb.append(storageErrorCode); + sb.append(",cid="); + sb.append(clientRequestId); + sb.append(",rid="); + sb.append(requestId); + if (isTraceEnabled) { + sb.append(",connMs="); + sb.append(connectionTimeMs); + sb.append(",sendMs="); + sb.append(sendRequestTimeMs); + sb.append(",recvMs="); + sb.append(recvResponseTimeMs); + } + sb.append(",sent="); + sb.append(bytesSent); + sb.append(",recv="); + sb.append(bytesReceived); + sb.append(","); + sb.append(method); + sb.append(","); + sb.append(urlStr); + return sb.toString(); + } + + /** + * Initializes a new HTTP request and opens the connection. + * + * @param url The full URL including query string parameters. + * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE). + * @param requestHeaders The HTTP request headers.READ_TIMEOUT + * + * @throws IOException if an error occurs. + */ + public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders) + throws IOException { + this.isTraceEnabled = this.LOG.isTraceEnabled(); + this.url = url; + this.method = method; + this.clientRequestId = UUID.randomUUID().toString(); + + this.connection = openConnection(); + + this.connection.setConnectTimeout(CONNECT_TIMEOUT); + this.connection.setReadTimeout(READ_TIMEOUT); + + this.connection.setRequestMethod(method); + + for (AbfsHttpHeader header : requestHeaders) { + this.connection.setRequestProperty(header.getName(), header.getValue()); + } + + this.connection.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, clientRequestId); + } + + /** + * Sends the HTTP request. Note that HttpUrlConnection requires that an + * empty buffer be sent in order to set the "Content-Length: 0" header, which + * is required by our endpoint. + * + * @param buffer the request entity body. + * @param offset an offset into the buffer where the data beings. + * @param length the length of the data in the buffer. + * + * @throws IOException if an error occurs. + */ + public void sendRequest(byte[] buffer, int offset, int length) throws IOException { + this.connection.setDoOutput(true); + this.connection.setFixedLengthStreamingMode(length); + if (buffer == null) { + // An empty buffer is sent to set the "Content-Length: 0" header, which + // is required by our endpoint. + buffer = new byte[]{}; + offset = 0; + length = 0; + } + + // send the request body + + long startTime = 0; + if (this.isTraceEnabled) { + startTime = System.nanoTime(); + } + try (OutputStream outputStream = this.connection.getOutputStream()) { + // update bytes sent before they are sent so we may observe + // attempted sends as well as successful sends via the + // accompanying statusCode + this.bytesSent = length; + outputStream.write(buffer, offset, length); + } finally { + if (this.isTraceEnabled) { + this.sendRequestTimeMs = elapsedTimeMs(startTime); + } + } + } + + /** + * Gets and processes the HTTP response. + * + * @throws IOException if an error occurs. + */ + public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException { + + // get the response + long startTime = 0; + if (this.isTraceEnabled) { + startTime = System.nanoTime(); + } + + this.statusCode = this.connection.getResponseCode(); + + if (this.isTraceEnabled) { + this.recvResponseTimeMs = elapsedTimeMs(startTime); + } + + this.statusDescription = this.connection.getResponseMessage(); + + this.requestId = this.connection.getHeaderField(HttpHeaderConfigurations.X_MS_REQUEST_ID); + if (this.requestId == null) { + this.requestId = AbfsHttpConstants.EMPTY_STRING; + } + + if (AbfsHttpConstants.HTTP_METHOD_HEAD.equals(this.method)) { + // If it is HEAD, and it is ERROR + return; + } + + if (this.isTraceEnabled) { + startTime = System.nanoTime(); + } + + if (statusCode >= HttpURLConnection.HTTP_BAD_REQUEST) { + processStorageErrorResponse(); + if (this.isTraceEnabled) { + this.recvResponseTimeMs += elapsedTimeMs(startTime); + } + this.bytesReceived = this.connection.getHeaderFieldLong(HttpHeaderConfigurations.CONTENT_LENGTH, 0); + } else { + // consume the input stream to release resources + int totalBytesRead = 0; + + try (InputStream stream = this.connection.getInputStream()) { + if (isNullInputStream(stream)) { + return; + } + boolean endOfStream = false; + + // this is a list operation and need to retrieve the data + // need a better solution + if (AbfsHttpConstants.HTTP_METHOD_GET.equals(this.method) && buffer == null) { + parseListFilesResponse(stream); + } else { + if (buffer != null) { + while (totalBytesRead < length) { + int bytesRead = stream.read(buffer, offset + totalBytesRead, length - totalBytesRead); + if (bytesRead == -1) { + endOfStream = true; + break; + } + totalBytesRead += bytesRead; + } + } + if (!endOfStream && stream.read() != -1) { + // read and discard + int bytesRead = 0; + byte[] b = new byte[CLEAN_UP_BUFFER_SIZE]; + while ((bytesRead = stream.read(b)) >= 0) { + totalBytesRead += bytesRead; + } + } + } + } catch (IOException ex) { + this.LOG.error("UnexpectedError: ", ex); + throw ex; + } finally { + if (this.isTraceEnabled) { + this.recvResponseTimeMs += elapsedTimeMs(startTime); + } + this.bytesReceived = totalBytesRead; + } + } + } + + + /** + * Open the HTTP connection. + * + * @throws IOException if an error occurs. + */ + private HttpURLConnection openConnection() throws IOException { + if (!isTraceEnabled) { + return (HttpURLConnection) url.openConnection(); + } + long start = System.nanoTime(); + try { + return (HttpURLConnection) url.openConnection(); + } finally { + connectionTimeMs = elapsedTimeMs(start); + } + } + + /** + * When the request fails, this function is used to parse the responseAbfsHttpClient.LOG.debug("ExpectedError: ", ex); + * and extract the storageErrorCode and storageErrorMessage. Any errors + * encountered while attempting to process the error response are logged, + * but otherwise ignored. + * + * For storage errors, the response body *usually* has the following format: + * + * { + * "error": + * { + * "code": "string", + * "message": "string" + * } + * } + * + */ + private void processStorageErrorResponse() { + try (InputStream stream = connection.getErrorStream()) { + if (stream == null) { + return; + } + JsonFactory jf = new JsonFactory(); + try (JsonParser jp = jf.createParser(stream)) { + String fieldName, fieldValue; + jp.nextToken(); // START_OBJECT - { + jp.nextToken(); // FIELD_NAME - "error": + jp.nextToken(); // START_OBJECT - { + jp.nextToken(); + while (jp.hasCurrentToken()) { + if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { + fieldName = jp.getCurrentName(); + jp.nextToken(); + fieldValue = jp.getText(); + switch (fieldName) { + case "code": + storageErrorCode = fieldValue; + break; + case "message": + storageErrorMessage = fieldValue; + break; + default: + break; + } + } + jp.nextToken(); + } + } + } catch (IOException ex) { + // Ignore errors that occur while attempting to parse the storage + // error, since the response may have been handled by the HTTP driver + // or for other reasons have an unexpected + this.LOG.debug("ExpectedError: ", ex); + } + } + + /** + * Returns the elapsed time in milliseconds. + */ + private long elapsedTimeMs(final long startTime) { + return (System.nanoTime() - startTime) / ONE_MILLION; + } + + /** + * Parse the list file response + * + * @param stream InputStream contains the list results. + * @throws IOException + */ + private void parseListFilesResponse(final InputStream stream) throws IOException { + if (stream == null) { + return; + } + + if (listResultSchema != null) { + // already parse the response + return; + } + + try { + final ObjectMapper objectMapper = new ObjectMapper(); + this.listResultSchema = objectMapper.readValue(stream, ListResultSchema.class); + } catch (IOException ex) { + this.LOG.error("Unable to deserialize list results", ex); + throw ex; + } + } + + /** + * Check null stream, this is to pass findbugs's redundant check for NULL + * @param stream InputStream + */ + private boolean isNullInputStream(InputStream stream) { + return stream == null ? true : false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java new file mode 100644 index 0000000..06e1a8a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java @@ -0,0 +1,693 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import javax.xml.bind.DatatypeConverter; +import java.io.File; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Set; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService; +import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory; +import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; +import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.util.Time.now; + +@Singleton +@InterfaceAudience.Private +@InterfaceStability.Evolving +final class AbfsHttpServiceImpl implements AbfsHttpService { + public static final Logger LOG = LoggerFactory.getLogger(AbfsHttpService.class); + private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'"; + private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1"; + private static final int LIST_MAX_RESULTS = 5000; + private static final int DELETE_DIRECTORY_TIMEOUT_MILISECONDS = 180000; + private static final int RENAME_TIMEOUT_MILISECONDS = 180000; + + private final AbfsHttpClientFactory abfsHttpClientFactory; + private final ConcurrentHashMap<AzureBlobFileSystem, AbfsClient> clientCache; + private final ConfigurationService configurationService; + private final Set<String> azureAtomicRenameDirSet; + + @Inject + AbfsHttpServiceImpl( + final ConfigurationService configurationService, + final AbfsHttpClientFactory abfsHttpClientFactory, + final TracingService tracingService) { + Preconditions.checkNotNull(abfsHttpClientFactory, "abfsHttpClientFactory"); + Preconditions.checkNotNull(configurationService, "configurationService"); + Preconditions.checkNotNull(tracingService, "tracingService"); + + this.configurationService = configurationService; + this.clientCache = new ConcurrentHashMap<>(); + this.abfsHttpClientFactory = abfsHttpClientFactory; + this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(configurationService.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA))); + } + + @Override + public Hashtable<String, String> getFilesystemProperties(final AzureBlobFileSystem azureBlobFileSystem) + throws AzureBlobFileSystemException{ + final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "getFilesystemProperties for filesystem: {}", + client.getFileSystem()); + + final Hashtable<String, String> parsedXmsProperties; + + final AbfsRestOperation op = client.getFilesystemProperties(); + final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES); + + parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties); + + return parsedXmsProperties; + } + + @Override + public void setFilesystemProperties(final AzureBlobFileSystem azureBlobFileSystem, final Hashtable<String, String> properties) throws + AzureBlobFileSystemException { + if (properties == null || properties.size() == 0) { + return; + } + + final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "setFilesystemProperties for filesystem: {} with properties: {}", + client.getFileSystem(), + properties); + + final String commaSeparatedProperties; + try { + commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + client.setFilesystemProperties(commaSeparatedProperties); + } + + @Override + public Hashtable<String, String> getPathProperties(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws + AzureBlobFileSystemException { + final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "getPathProperties for filesystem: {} path: {}", + client.getFileSystem(), + path.toString()); + + final Hashtable<String, String> parsedXmsProperties; + final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + + final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES); + + parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties); + + return parsedXmsProperties; + } + + @Override + public void setPathProperties(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final Hashtable<String, + String> properties) throws + AzureBlobFileSystemException { + final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "setFilesystemProperties for filesystem: {} path: {} with properties: {}", + client.getFileSystem(), + path.toString(), + properties); + + final String commaSeparatedProperties; + try { + commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + client.setPathProperties("/" + getRelativePath(path), commaSeparatedProperties); + } + + @Override + public void createFilesystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException { + final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "createFilesystem for filesystem: {}", + client.getFileSystem()); + + client.createFilesystem(); + } + + @Override + public void deleteFilesystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException { + final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "deleteFilesystem for filesystem: {}", + client.getFileSystem()); + + client.deleteFilesystem(); + } + + @Override + public OutputStream createFile(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean overwrite) throws + AzureBlobFileSystemException { + final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "createFile filesystem: {} path: {} overwrite: {}", + client.getFileSystem(), + path.toString(), + overwrite); + + client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite); + + final OutputStream outputStream; + outputStream = new FSDataOutputStream( + new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, + configurationService.getWriteBufferSize()), null); + return outputStream; + } + + @Override + public Void createDirectory(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException { + final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "createDirectory filesystem: {} path: {} overwrite: {}", + client.getFileSystem(), + path.toString()); + + client.createPath("/" + getRelativePath(path), false, true); + + return null; + } + + @Override + public InputStream openFileForRead(final AzureBlobFileSystem azureBlobFileSystem, final Path path, + final FileSystem.Statistics statistics) throws AzureBlobFileSystemException { + final AbfsClient client = getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "openFileForRead filesystem: {} path: {}", + client.getFileSystem(), + path.toString()); + + final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + + final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + + if (parseIsDirectory(resourceType)) { + throw new AbfsRestOperationException( + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + "openFileForRead must be used with files and not directories", + null); + } + + // Add statistics for InputStream + return new FSDataInputStream( + new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, + configurationService.getReadBufferSize(), configurationService.getReadAheadQueueDepth(), eTag)); + } + + @Override + public OutputStream openFileForWrite(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean overwrite) throws + AzureBlobFileSystemException { + final AbfsClient client = getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "openFileForWrite filesystem: {} path: {} overwrite: {}", + client.getFileSystem(), + path.toString(), + overwrite); + + final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + + final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + final Long contentLength = Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + + if (parseIsDirectory(resourceType)) { + throw new AbfsRestOperationException( + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + "openFileForRead must be used with files and not directories", + null); + } + + final long offset = overwrite ? 0 : contentLength; + + final OutputStream outputStream; + outputStream = new FSDataOutputStream( + new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + offset, configurationService.getWriteBufferSize()), null); + return outputStream; + } + + @Override + public void rename(final AzureBlobFileSystem azureBlobFileSystem, final Path source, final Path destination) throws + AzureBlobFileSystemException { + + if (isAtomicRenameKey(source.getName())) { + this.LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename," + +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account."); + } + + final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "renameAsync filesystem: {} source: {} destination: {}", + client.getFileSystem(), + source.toString(), + destination.toString()); + + String continuation = null; + long deadline = now() + RENAME_TIMEOUT_MILISECONDS; + + do { + if (now() > deadline) { + LOG.debug( + "Rename {} to {} timed out.", + source, + destination); + + throw new TimeoutException("Rename timed out."); + } + + AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source), + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation); + continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); + + } while (continuation != null && !continuation.isEmpty()); + } + + @Override + public void delete(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean recursive) throws + AzureBlobFileSystemException { + final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "delete filesystem: {} path: {} recursive: {}", + client.getFileSystem(), + path.toString(), + String.valueOf(recursive)); + + String continuation = null; + long deadline = now() + DELETE_DIRECTORY_TIMEOUT_MILISECONDS; + + do { + if (now() > deadline) { + this.LOG.debug( + "Delete directory {} timed out.", path); + + throw new TimeoutException("Delete directory timed out."); + } + + AbfsRestOperation op = client.deletePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), recursive, continuation); + continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); + + } while (continuation != null && !continuation.isEmpty()); + } + + @Override + public FileStatus getFileStatus(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException { + final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "getFileStatus filesystem: {} path: {}", + client.getFileSystem(), + path.toString()); + + if (path.isRoot()) { + AbfsRestOperation op = client.getFilesystemProperties(); + final long blockSize = configurationService.getAzureBlockSize(); + final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); + return new VersionedFileStatus( + azureBlobFileSystem.getOwnerUser(), + azureBlobFileSystem.getOwnerUserPrimaryGroup(), + 0, + true, + 1, + blockSize, + parseLastModifiedTime(lastModified).getMillis(), + path, + eTag); + } else { + AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + + final long blockSize = configurationService.getAzureBlockSize(); + final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); + final String contentLength = op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH); + final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + + return new VersionedFileStatus( + azureBlobFileSystem.getOwnerUser(), + azureBlobFileSystem.getOwnerUserPrimaryGroup(), + parseContentLength(contentLength), + parseIsDirectory(resourceType), + 1, + blockSize, + parseLastModifiedTime(lastModified).getMillis(), + path, + eTag); + } + } + + @Override + public FileStatus[] listStatus(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException { + final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); + + this.LOG.debug( + "listStatus filesystem: {} path: {}", + client.getFileSystem(), + path.toString()); + + String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path); + String continuation = null; + ArrayList<FileStatus> fileStatuses = new ArrayList<>(); + + do { + AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation); + continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); + ListResultSchema retrievedSchema = op.getResult().getListResultSchema(); + if (retrievedSchema == null) { + throw new AbfsRestOperationException( + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + "listStatusAsync path not found", + null, op.getResult()); + } + + long blockSize = configurationService.getAzureBlockSize(); + + for (ListResultEntrySchema entry : retrievedSchema.paths()) { + long lastModifiedMillis = 0; + long contentLength = entry.contentLength() == null ? 0 : entry.contentLength(); + boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory(); + if (entry.lastModified() != null && !entry.lastModified().isEmpty()) { + final DateTime dateTime = DateTime.parse( + entry.lastModified(), + DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC()); + lastModifiedMillis = dateTime.getMillis(); + } + + fileStatuses.add( + new VersionedFileStatus( + azureBlobFileSystem.getOwnerUser(), + azureBlobFileSystem.getOwnerUserPrimaryGroup(), + contentLength, + isDirectory, + 1, + blockSize, + lastModifiedMillis, + azureBlobFileSystem.makeQualified(new Path(File.separator + entry.name())), + entry.eTag())); + } + + } while (continuation != null && !continuation.isEmpty()); + + return fileStatuses.toArray(new FileStatus[0]); + } + + @Override + public synchronized void closeFileSystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException { + this.clientCache.remove(azureBlobFileSystem); + } + + @Override + public boolean isAtomicRenameKey(String key) { + return isKeyForDirectorySet(key, azureAtomicRenameDirSet); + } + + private String getRelativePath(final Path path) { + Preconditions.checkNotNull(path, "path"); + final String relativePath = path.toUri().getPath(); + + if (relativePath.length() == 0) { + return relativePath; + } + + if (relativePath.charAt(0) == Path.SEPARATOR_CHAR) { + if (relativePath.length() == 1) { + return AbfsHttpConstants.EMPTY_STRING; + } + + return relativePath.substring(1); + } + + return relativePath; + } + + private synchronized AbfsClient getOrCreateClient(final AzureBlobFileSystem azureBlobFileSystem) throws + AzureBlobFileSystemException { + Preconditions.checkNotNull(azureBlobFileSystem, "azureBlobFileSystem"); + + AbfsClient client = this.clientCache.get(azureBlobFileSystem); + + if (client != null) { + return client; + } + + client = abfsHttpClientFactory.create(azureBlobFileSystem); + this.clientCache.put( + azureBlobFileSystem, + client); + return client; + } + + private long parseContentLength(final String contentLength) { + if (contentLength == null) { + return -1; + } + + return Long.parseLong(contentLength); + } + + private boolean parseIsDirectory(final String resourceType) { + return resourceType == null ? false : resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY); + } + + private DateTime parseLastModifiedTime(final String lastModifiedTime) { + return DateTime.parse( + lastModifiedTime, + DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC()); + } + + private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws + CharacterCodingException { + StringBuilder commaSeparatedProperties = new StringBuilder(); + + final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING).newEncoder(); + + for (Map.Entry<String, String> propertyEntry : properties.entrySet()) { + String key = propertyEntry.getKey(); + String value = propertyEntry.getValue(); + + Boolean canEncodeValue = encoder.canEncode(value); + if (!canEncodeValue) { + throw new CharacterCodingException(); + } + + String encodedPropertyValue = DatatypeConverter.printBase64Binary(encoder.encode(CharBuffer.wrap(value)).array()); + commaSeparatedProperties.append(key) + .append(AbfsHttpConstants.EQUAL) + .append(encodedPropertyValue); + + commaSeparatedProperties.append(AbfsHttpConstants.COMMA); + } + + if (commaSeparatedProperties.length() != 0) { + commaSeparatedProperties.deleteCharAt(commaSeparatedProperties.length() - 1); + } + + return commaSeparatedProperties.toString(); + } + + private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsProperties) throws + InvalidFileSystemPropertyException, InvalidAbfsRestOperationException { + Hashtable<String, String> properties = new Hashtable<>(); + + final CharsetDecoder decoder = Charset.forName(XMS_PROPERTIES_ENCODING).newDecoder(); + + if (xMsProperties != null && !xMsProperties.isEmpty()) { + String[] userProperties = xMsProperties.split(AbfsHttpConstants.COMMA); + + if (userProperties.length == 0) { + return properties; + } + + for (String property : userProperties) { + if (property.isEmpty()) { + throw new InvalidFileSystemPropertyException(xMsProperties); + } + + String[] nameValue = property.split(AbfsHttpConstants.EQUAL, 2); + if (nameValue.length != 2) { + throw new InvalidFileSystemPropertyException(xMsProperties); + } + + byte[] decodedValue = DatatypeConverter.parseBase64Binary(nameValue[1]); + + final String value; + try { + value = decoder.decode(ByteBuffer.wrap(decodedValue)).toString(); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + properties.put(nameValue[0], value); + } + } + + return properties; + } + + private boolean isKeyForDirectorySet(String key, Set<String> dirSet) { + for (String dir : dirSet) { + if (dir.isEmpty() || key.startsWith(dir + AbfsHttpConstants.FORWARD_SLASH)) { + return true; + } + + try { + URI uri = new URI(dir); + if (null == uri.getAuthority()) { + if (key.startsWith(dir + "/")){ + return true; + } + } + } catch (URISyntaxException e) { + this.LOG.info("URI syntax error creating URI for {}", dir); + } + } + + return false; + } + + private static class VersionedFileStatus extends FileStatus { + private final String version; + + VersionedFileStatus( + final String owner, final String group, + final long length, final boolean isdir, final int blockReplication, + final long blocksize, final long modificationTime, final Path path, + String version) { + super(length, isdir, blockReplication, blocksize, modificationTime, 0, + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), + owner, + group, + path); + + this.version = version; + } + + /** Compare if this object is equal to another object. + * @param obj the object to be compared. + * @return true if two file status has the same path name; false if not. + */ + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null) { + return false; + } + + if (this.getClass() == obj.getClass()) { + VersionedFileStatus other = (VersionedFileStatus) obj; + return this.getPath().equals(other.getPath()) && this.version.equals(other.version); + } + + return false; + } + + /** + * Returns a hash code value for the object, which is defined as + * the hash code of the path name. + * + * @return a hash code value for the path name and version + */ + @Override + public int hashCode() { + int hash = getPath().hashCode(); + hash = 89 * hash + (this.version != null ? this.version.hashCode() : 0); + return hash; + } + + /** + * Returns the version of this FileStatus + * + * @return a string value for the FileStatus version + */ + public String getVersion() { + return this.version; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java new file mode 100644 index 0000000..6554380 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -0,0 +1,382 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.EOFException; +import java.io.IOException; + +import com.google.common.base.Preconditions; + +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; + +/** + * The AbfsInputStream for AbfsClient + */ +public class AbfsInputStream extends FSInputStream { + private final AbfsClient client; + private final Statistics statistics; + private final String path; + private final long contentLength; + private final int bufferSize; // default buffer size + private final int readAheadQueueDepth; // initialized in constructor + private final String eTag; // eTag of the path when InputStream are created + private final boolean tolerateOobAppends; // whether tolerate Oob Appends + private final boolean readAheadEnabled; // whether enable readAhead; + + private byte[] buffer = null; // will be initialized on first use + + private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server + private long fCursorAfterLastRead = -1; + private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer + private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 + // of valid bytes in buffer) + private boolean closed = false; + + public AbfsInputStream( + final AbfsClient client, + final Statistics statistics, + final String path, + final long contentLength, + final int bufferSize, + final int readAheadQueueDepth, + final String eTag) { + super(); + this.client = client; + this.statistics = statistics; + this.path = path; + this.contentLength = contentLength; + this.bufferSize = bufferSize; + this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : 2 * Runtime.getRuntime().availableProcessors(); + this.eTag = eTag; + this.tolerateOobAppends = false; + this.readAheadEnabled = true; + } + + public String getPath() { + return path; + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + int numberOfBytesRead = read(b, 0, 1); + if (numberOfBytesRead < 0) { + return -1; + } else { + return (b[0] & 0xFF); + } + } + + @Override + public synchronized int read(final byte[] b, final int off, final int len) throws IOException { + int currentOff = off; + int currentLen = len; + int lastReadBytes; + int totalReadBytes = 0; + do { + lastReadBytes = readOneBlock(b, currentOff, currentLen); + if (lastReadBytes > 0) { + currentOff += lastReadBytes; + currentLen -= lastReadBytes; + totalReadBytes += lastReadBytes; + } + if (currentLen <= 0 || currentLen > b.length - currentOff) { + break; + } + } while (lastReadBytes > 0); + return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; + } + + private int readOneBlock(final byte[] b, final int off, final int len) throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + + Preconditions.checkNotNull(b); + + if (len == 0) { + return 0; + } + + if (this.available() == 0) { + return -1; + } + + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + + //If buffer is empty, then fill the buffer. + if (bCursor == limit) { + //If EOF, then return -1 + if (fCursor >= contentLength) { + return -1; + } + + long bytesRead = 0; + //reset buffer to initial state - i.e., throw away existing data + bCursor = 0; + limit = 0; + if (buffer == null) { + buffer = new byte[bufferSize]; + } + + // Enable readAhead when reading sequentially + if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { + bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); + } else { + bytesRead = readInternal(fCursor, buffer, 0, b.length, true); + } + + if (bytesRead == -1) { + return -1; + } + + limit += bytesRead; + fCursor += bytesRead; + fCursorAfterLastRead = fCursor; + } + + //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) + //(bytes returned may be less than requested) + int bytesRemaining = limit - bCursor; + int bytesToRead = Math.min(len, bytesRemaining); + System.arraycopy(buffer, bCursor, b, off, bytesToRead); + bCursor += bytesToRead; + if (statistics != null) { + statistics.incrementBytesRead(bytesToRead); + } + return bytesToRead; + } + + + private int readInternal(final long position, final byte[] b, final int offset, final int length, + final boolean bypassReadAhead) throws IOException { + if (readAheadEnabled && !bypassReadAhead) { + // try reading from read-ahead + if (offset != 0) { + throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets"); + } + int receivedBytes; + + // queue read-aheads + int numReadAheads = this.readAheadQueueDepth; + long nextSize; + long nextOffset = position; + while (numReadAheads > 0 && nextOffset < contentLength) { + nextSize = Math.min((long) bufferSize, contentLength - nextOffset); + ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); + nextOffset = nextOffset + nextSize; + numReadAheads--; + } + + // try reading from buffers first + receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); + if (receivedBytes > 0) { + return receivedBytes; + } + + // got nothing from read-ahead, do our own read now + receivedBytes = readRemote(position, b, offset, length); + return receivedBytes; + } else { + return readRemote(position, b, offset, length); + } + } + + int readRemote(long position, byte[] b, int offset, int length) throws IOException { + if (position < 0) { + throw new IllegalArgumentException("attempting to read from negative offset"); + } + if (position >= contentLength) { + return -1; // Hadoop prefers -1 to EOFException + } + if (b == null) { + throw new IllegalArgumentException("null byte array passed in to read() method"); + } + if (offset >= b.length) { + throw new IllegalArgumentException("offset greater than length of array"); + } + if (length < 0) { + throw new IllegalArgumentException("requested read length is less than zero"); + } + if (length > (b.length - offset)) { + throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); + } + final AbfsRestOperation op; + try { + op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag); + } catch (AzureBlobFileSystemException ex) { + throw new IOException(ex); + } + long bytesRead = op.getResult().getBytesReceived(); + if (bytesRead > Integer.MAX_VALUE) { + throw new IOException("Unexpected Content-Length"); + } + return (int) bytesRead; + } + + /** + * Seek to given position in stream. + * @param n position to seek to + * @throws IOException if there is an error + * @throws EOFException if attempting to seek past end of file + */ + @Override + public synchronized void seek(long n) throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + if (n < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); + } + if (n > contentLength) { + throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); + } + + if (n>=fCursor-limit && n<=fCursor) { // within buffer + bCursor = (int) (n-(fCursor-limit)); + return; + } + + // next read will read from here + fCursor = n; + + //invalidate buffer + limit = 0; + bCursor = 0; + } + + @Override + public synchronized long skip(long n) throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + long currentPos = getPos(); + if (currentPos == contentLength) { + if (n > 0) { + throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); + } + } + long newPos = currentPos + n; + if (newPos < 0) { + newPos = 0; + n = newPos - currentPos; + } + if (newPos > contentLength) { + newPos = contentLength; + n = newPos - currentPos; + } + seek(newPos); + return n; + } + + /** + * Return the size of the remaining available bytes + * if the size is less than or equal to {@link Integer#MAX_VALUE}, + * otherwise, return {@link Integer#MAX_VALUE}. + * + * This is to match the behavior of DFSInputStream.available(), + * which some clients may rely on (HBase write-ahead log reading in + * particular). + */ + @Override + public synchronized int available() throws IOException { + if (closed) { + throw new IOException( + FSExceptionMessages.STREAM_IS_CLOSED); + } + final long remaining = this.contentLength - this.getPos(); + return remaining <= Integer.MAX_VALUE + ? (int) remaining : Integer.MAX_VALUE; + } + + /** + * Returns the length of the file that this stream refers to. Note that the length returned is the length + * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, + * they wont be reflected in the returned length. + * + * @return length of the file. + * @throws IOException if the stream is closed + */ + public long length() throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + return contentLength; + } + + /** + * Return the current offset from the start of the file + * @throws IOException throws {@link IOException} if there is an error + */ + @Override + public synchronized long getPos() throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + return fCursor - limit + bCursor; + } + + /** + * Seeks a different copy of the data. Returns true if + * found a new source, false otherwise. + * @throws IOException throws {@link IOException} if there is an error + */ + @Override + public boolean seekToNewSource(long l) throws IOException { + return false; + } + + @Override + public synchronized void close() throws IOException { + closed = true; + buffer = null; // de-reference the buffer so it can be GC'ed sooner + } + + /** + * Not supported by this stream. Throws {@link UnsupportedOperationException} + * @param readlimit ignored + */ + @Override + public synchronized void mark(int readlimit) { + throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); + } + + /** + * Not supported by this stream. Throws {@link UnsupportedOperationException} + */ + @Override + public synchronized void reset() throws IOException { + throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); + } + + /** + * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false. + * + * @return always {@code false} + */ + @Override + public boolean markSupported() { + return false; + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org