[
https://issues.apache.org/jira/browse/HADOOP-19187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17869599#comment-17869599
]
ASF GitHub Bot commented on HADOOP-19187:
-----------------------------------------
anujmodi2021 commented on code in PR #6879:
URL: https://github.com/apache/hadoop/pull/6879#discussion_r1696685002
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java:
##########
@@ -0,0 +1,1308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.utils.Base64;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_BLOB_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_JSON;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_OCTET_STREAM;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BREAK_LEASE_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHECK_ACCESS;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_LEASE_BREAK_PERIOD;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILESYSTEM;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FLUSH_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.GET_ACCESS_CONTROL;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.GET_STATUS;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RELEASE_LEASE_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RENEW_LEASE_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SET_ACCESS_CONTROL;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SET_PROPERTIES_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_NONE_MATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_EXISTING_RESOURCE_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_BREAK_PERIOD;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_DURATION;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ID;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_PROPERTIES;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RANGE_GET_CONTENT_MD5;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RENAME_SOURCE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_FS_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOBTYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CONTINUATION;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_DIRECTORY;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_FLUSH;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_MAXRESULTS;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_PAGINATED;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RECURSIVE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESOURCE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RETAIN_UNCOMMITTED_DATA;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
+
+/**
+ * AbfsClient interacting with the DFS Endpoint.
+ */
+public class AbfsDfsClient extends AbfsClient implements Closeable {
+
+ public AbfsDfsClient(final URL baseUrl,
+ final SharedKeyCredentials sharedKeyCredentials,
+ final AbfsConfiguration abfsConfiguration,
+ final AccessTokenProvider tokenProvider,
+ final EncryptionContextProvider encryptionContextProvider,
+ final AbfsClientContext abfsClientContext) throws IOException {
+ super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
+ encryptionContextProvider, abfsClientContext);
+ }
+
+ public AbfsDfsClient(final URL baseUrl,
+ final SharedKeyCredentials sharedKeyCredentials,
+ final AbfsConfiguration abfsConfiguration,
+ final SASTokenProvider sasTokenProvider,
+ final EncryptionContextProvider encryptionContextProvider,
+ final AbfsClientContext abfsClientContext) throws IOException {
+ super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
+ encryptionContextProvider, abfsClientContext);
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+
+ /**
+ * Create request headers for Rest Operation using the default API version.
+ * @return default request headers.
+ */
+ @Override
+ public List<AbfsHttpHeader> createDefaultHeaders() {
+ return this.createDefaultHeaders(getxMsVersion());
+ }
+
+ /**
+ * Create request headers for Rest Operation using the specified API version.
+ * DFS Endpoint API responses are in JSON/Stream format.
+ * @param xMsVersion API version to be used.
+ * @return default request headers.
+ */
+ @Override
+ public List<AbfsHttpHeader> createDefaultHeaders(ApiVersion xMsVersion) {
+ List<AbfsHttpHeader> requestHeaders =
super.createCommonHeaders(xMsVersion);
+ requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON
+ + COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM));
+ return requestHeaders;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/filesystem/create">
+ * Filesystem - Create</a>.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation createFilesystem(TracingContext tracingContext)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.CreateFileSystem,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/filesystem/set-properties">
+ * Filesystem - Set Properties</a>.
+ * @param properties comma separated list of metadata key-value pairs.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation setFilesystemProperties(final Hashtable<String,
String> properties,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final String commaSeparatedProperties;
+ try {
+ commaSeparatedProperties =
convertXmsPropertiesToCommaSeparatedString(properties);
+ } catch (CharacterCodingException ex) {
+ throw new InvalidAbfsRestOperationException(ex);
+ }
+
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ // JDK7 does not support PATCH, so to work around the issue we will use
+ // PUT and specify the real method in the X-Http-Method-Override header.
+ requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
+ HTTP_METHOD_PATCH));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES,
commaSeparatedProperties));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.SetFileSystemProperties,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/filesystem/get-properties">
+ * Filesystem - Get Properties</a>.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ * */
+ @Override
+ public AbfsRestOperation getFilesystemProperties(TracingContext
tracingContext)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.GetFileSystemProperties,
+ HTTP_METHOD_HEAD, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/filesystem/delete">
+ * Filesystem - Delete</a>.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.DeleteFileSystem,
+ HTTP_METHOD_DELETE, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/list">
+ * Filesystem - List</a>.
+ * List paths and their properties in the current filesystem.
+ * @param relativePath to return only blobs within this directory.
+ * @param recursive to return all blobs in the path, including those in
subdirectories.
+ * @param listMaxResults maximum number of blobs to return.
+ * @param continuation marker to specify the continuation token.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation or response
parsing fails.
+ */
+ @Override
+ public AbfsRestOperation listPath(final String relativePath,
+ final boolean recursive,
+ final int listMaxResults,
+ final String continuation,
+ TracingContext tracingContext) throws IOException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY,
+ getDirectoryQueryParameter(relativePath));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE,
String.valueOf(recursive));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS,
+ String.valueOf(listMaxResults));
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN,
+ String.valueOf(getAbfsConfiguration().isUpnUsed()));
+ appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION,
+ abfsUriQueryBuilder);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.ListPaths,
+ HTTP_METHOD_GET, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create">
+ * Path - Create</a>.
+ * Create a path (file or directory) in the current filesystem.
+ * @param path to be created inside the filesystem.
+ * @param isFile to specify if the created path is file or directory.
+ * @param overwrite to specify if the path should be overwritten if it
already exists.
+ * @param permissions to specify the permissions of the path.
+ * @param isAppendBlob to specify if the path to be created is an append
blob.
+ * @param eTag to specify conditional headers.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation createPath(final String path,
+ final boolean isFile,
+ final boolean overwrite,
+ final AzureBlobFileSystemStore.Permissions permissions,
+ final boolean isAppendBlob,
+ final String eTag,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException
{
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ if (isFile) {
+ addEncryptionKeyRequestHeaders(path, requestHeaders, true,
+ contextEncryptionAdapter, tracingContext);
+ }
+ if (!overwrite) {
+ requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
+ }
+
+ if (permissions.hasPermission()) {
+ requestHeaders.add(new
AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS,
+ permissions.getPermission()));
+ }
+
+ if (permissions.hasUmask()) {
+ requestHeaders.add(new
AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK,
+ permissions.getUmask()));
+ }
+
+ if (eTag != null && !eTag.isEmpty()) {
+ requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
+ }
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE :
DIRECTORY);
+ if (isAppendBlob) {
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOBTYPE, APPEND_BLOB_TYPE);
+ }
+
+ String operation = isFile
+ ? SASTokenProvider.CREATE_FILE_OPERATION
+ : SASTokenProvider.CREATE_DIRECTORY_OPERATION;
+ appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.CreatePath,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ try {
+ op.execute(tracingContext);
+ } catch (AzureBlobFileSystemException ex) {
+ // If we have no HTTP response, throw the original exception.
+ if (!op.hasResult()) {
+ throw ex;
+ }
+ if (!isFile && op.getResult().getStatusCode() ==
HttpURLConnection.HTTP_CONFLICT) {
+ String existingResource =
+ op.getResult().getResponseHeader(X_MS_EXISTING_RESOURCE_TYPE);
+ if (existingResource != null && existingResource.equals(DIRECTORY)) {
+ return op; //don't throw ex on mkdirs for existing directory
+ }
+ }
+ throw ex;
+ }
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/lease">
+ * Path - Lease</a>.
+ * Acquire lease on specified path.
+ * @param path on which lease has to be acquired.
+ * @param duration for which lease has to be acquired.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation acquireLease(final String path, final int duration,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION,
ACQUIRE_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION,
Integer.toString(duration)));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID,
+ UUID.randomUUID().toString()));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeasePath,
+ HTTP_METHOD_POST, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/lease">
+ * Path - Lease</a>.
+ * Renew lease on specified path.
+ * @param path on which lease has to be renewed.
+ * @param leaseId of the lease to be renewed.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation renewLease(final String path, final String leaseId,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION,
RENEW_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeasePath,
+ HTTP_METHOD_POST, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/lease">
+ * Path - Lease</a>.
+ * Release lease on specified path.
+ * @param path on which lease has to be released.
+ * @param leaseId of the lease to be released.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation releaseLease(final String path, final String
leaseId,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION,
RELEASE_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeasePath,
+ HTTP_METHOD_POST, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/lease">
+ * Path - Lease</a>.
+ * Break lease on specified path.
+ * @param path on which lease has to be broke.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation breakLease(final String path,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION,
BREAK_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD,
+ DEFAULT_LEASE_BREAK_PERIOD));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeasePath,
+ HTTP_METHOD_POST, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create">
+ * Path - Create</a>.
+ * @param source path to source file
+ * @param destination destination of rename.
+ * @param continuation continuation.
+ * @param tracingContext for tracing the server calls.
+ * @param sourceEtag etag of source file. may be null or empty
+ * @param isMetadataIncompleteState was there a rename failure due to
incomplete metadata state?
+ * @param isNamespaceEnabled whether namespace enabled account or not
+ * @return executed rest operation containing response from server.
+ * @throws IOException if rest operation fails.
+ */
+ @Override
+ public AbfsClientRenameResult renamePath(
+ final String source,
+ final String destination,
+ final String continuation,
+ final TracingContext tracingContext,
+ String sourceEtag,
+ boolean isMetadataIncompleteState,
+ boolean isNamespaceEnabled) throws IOException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final boolean hasEtag = !isEmpty(sourceEtag);
+
+ boolean shouldAttemptRecovery = isRenameResilience() && isNamespaceEnabled;
+ if (!hasEtag && shouldAttemptRecovery) {
+ // in case eTag is already not supplied to the API
+ // and rename resilience is expected and it is an HNS enabled account
+ // fetch the source etag to be used later in recovery
+ try {
+ final AbfsRestOperation srcStatusOp = getPathStatus(source,
+ false, tracingContext, null);
+ if (srcStatusOp.hasResult()) {
+ final AbfsHttpOperation result = srcStatusOp.getResult();
+ sourceEtag = extractEtagHeader(result);
+ // and update the directory status.
+ boolean isDir = checkIsDir(result);
+ shouldAttemptRecovery = !isDir;
+ LOG.debug(
+ "Retrieved etag of source for rename recovery: {}; isDir={}",
+ sourceEtag, isDir);
+ }
+ } catch (AbfsRestOperationException e) {
+ throw new AbfsRestOperationException(e.getStatusCode(),
+ SOURCE_PATH_NOT_FOUND.getErrorCode(),
+ e.getMessage(), e);
+ }
+
+ }
+
+ String encodedRenameSource = urlEncode(
+ FORWARD_SLASH + this.getFileSystem() + source);
+ if (getAuthType() == AuthType.SAS) {
+ final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder();
+ appendSASTokenToQuery(source, SASTokenProvider.RENAME_SOURCE_OPERATION,
+ srcQueryBuilder);
+ encodedRenameSource += srcQueryBuilder.toString();
+ }
+
+ LOG.trace("Rename source queryparam added {}", encodedRenameSource);
+ requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE,
encodedRenameSource));
+ requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
+ appendSASTokenToQuery(destination,
+ SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder);
+
+ final URL url = createRequestUrl(destination,
+ abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = createRenameRestOperation(url,
requestHeaders);
+ try {
+ incrementAbfsRenamePath();
+ op.execute(tracingContext);
+ // AbfsClientResult contains the AbfsOperation, If recovery happened or
+ // not, and the incompleteMetaDataState is true or false.
+ // If we successfully rename a path and isMetadataIncompleteState was
+ // true, then rename was recovered, else it didn't, this is why
+ // isMetadataIncompleteState is used for renameRecovery(as the 2nd
param).
+ return new AbfsClientRenameResult(op, isMetadataIncompleteState,
+ isMetadataIncompleteState);
+ } catch (AzureBlobFileSystemException e) {
+ // If we have no HTTP response, throw the original exception.
+ if (!op.hasResult()) {
+ throw e;
+ }
+
+ // ref: HADOOP-18242. Rename failure occurring due to a rare case of
+ // tracking metadata being in incomplete state.
+ if (op.getResult().getStorageErrorCode()
+ .equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode())
+ && !isMetadataIncompleteState) {
+ //Logging
+ ABFS_METADATA_INCOMPLETE_RENAME_FAILURE
+ .info(
+ "Rename Failure attempting to resolve tracking metadata state
and retrying.");
+ // rename recovery should be attempted in this case also
+ shouldAttemptRecovery = true;
+ isMetadataIncompleteState = true;
+ String sourceEtagAfterFailure = sourceEtag;
+ if (isEmpty(sourceEtagAfterFailure)) {
+ // Doing a HEAD call resolves the incomplete metadata state and
+ // then we can retry the rename operation.
+ AbfsRestOperation sourceStatusOp = getPathStatus(source, false,
+ tracingContext, null);
+ isMetadataIncompleteState = true;
+ // Extract the sourceEtag, using the status Op, and set it
+ // for future rename recovery.
+ AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult();
+ sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult);
+ }
+ renamePath(source, destination, continuation, tracingContext,
+ sourceEtagAfterFailure, isMetadataIncompleteState,
+ isNamespaceEnabled);
+ }
+ // if we get out of the condition without a successful rename, then
+ // it isn't metadata incomplete state issue.
+ isMetadataIncompleteState = false;
+
+ // setting default rename recovery success to false
+ boolean etagCheckSucceeded = false;
+ if (shouldAttemptRecovery) {
+ etagCheckSucceeded = renameIdempotencyCheckOp(
+ source,
+ sourceEtag, op, destination, tracingContext);
+ }
+ if (!etagCheckSucceeded) {
+ // idempotency did not return different result
+ // throw back the exception
+ throw e;
+ }
+ return new AbfsClientRenameResult(op, true, isMetadataIncompleteState);
+ }
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">
+ * Path - Update</a>.
+ * Uploads data to be appended to a file.
+ * @param path to which data has to be appended.
+ * @param buffer containing data to be appended.
+ * @param reqParams containing parameters for append operation like offset,
length etc.
+ * @param cachedSasToken to be used for the authenticating operation.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation append(final String path,
+ final byte[] buffer,
+ AppendRequestParameters reqParams,
+ final String cachedSasToken,
+ ContextEncryptionAdapter contextEncryptionAdapter,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ addEncryptionKeyRequestHeaders(path, requestHeaders, false,
+ contextEncryptionAdapter, tracingContext);
+ if (reqParams.isExpectHeaderEnabled()) {
+ requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
+ }
+ // JDK7 does not support PATCH, so to workaround the issue we will use
+ // PUT and specify the real method in the X-Http-Method-Override header.
+ requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
+ if (reqParams.getLeaseId() != null) {
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID,
reqParams.getLeaseId()));
+ }
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION,
+ Long.toString(reqParams.getPosition()));
+
+ if ((reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_MODE) || (
+ reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE))
{
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, TRUE);
+ if (reqParams.getMode() ==
AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) {
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, TRUE);
+ }
+ }
+
+ // Check if the retry is with "Expect: 100-continue" header being present
in the previous request.
+ if (reqParams.isRetryDueToExpect()) {
+ String userAgentRetry = getUserAgent();
+ // Remove the specific marker related to "Expect: 100-continue" from the
User-Agent string.
+ userAgentRetry = userAgentRetry.replace(HUNDRED_CONTINUE_USER_AGENT,
EMPTY_STRING);
+ requestHeaders.removeIf(header ->
header.getName().equalsIgnoreCase(USER_AGENT));
+ requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgentRetry));
+ }
+
+ // Add MD5 Hash of request content as request header if feature is enabled
+ if (isChecksumValidationEnabled()) {
+ addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer);
+ }
+
+ // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
+ String sasTokenForReuse = appendSASTokenToQuery(path,
+ SASTokenProvider.WRITE_OPERATION,
+ abfsUriQueryBuilder, cachedSasToken);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.Append,
+ HTTP_METHOD_PUT, url, requestHeaders,
+ buffer, reqParams.getoffset(), reqParams.getLength(),
+ sasTokenForReuse);
+ try {
+ op.execute(tracingContext);
+ } catch (AbfsRestOperationException e) {
+ /*
+ If the http response code indicates a user error we retry
+ the same append request with expect header being disabled.
+ When "100-continue" header is enabled but a non Http 100 response
comes,
+ the response message might not get set correctly by the server.
+ So, this handling is to avoid breaking of backward compatibility
+ if someone has taken dependency on the exception message,
+ which is created using the error string present in the response
header.
+ */
+ int responseStatusCode = e.getStatusCode();
+ if (checkUserError(responseStatusCode)
+ && reqParams.isExpectHeaderEnabled()) {
+ LOG.debug(
+ "User error, retrying without 100 continue enabled for the given
path {}",
+ path);
+ reqParams.setExpectHeaderEnabled(false);
+ reqParams.setRetryDueToExpect(true);
+ return this.append(path, buffer, reqParams, cachedSasToken,
+ contextEncryptionAdapter, tracingContext);
+ }
+ // If we have no HTTP response, throw the original exception.
+ if (!op.hasResult()) {
+ throw e;
+ }
+
+ if (isMd5ChecksumError(e)) {
+ throw new AbfsInvalidChecksumException(e);
+ }
+
+ if (reqParams.isAppendBlob()
+ && appendSuccessCheckOp(op, path,
+ (reqParams.getPosition() + reqParams.getLength()), tracingContext)) {
+ final AbfsRestOperation successOp = getAbfsRestOperation(
+ AbfsRestOperationType.Append,
+ HTTP_METHOD_PUT, url, requestHeaders,
+ buffer, reqParams.getoffset(), reqParams.getLength(),
+ sasTokenForReuse);
+ successOp.hardSetResult(HttpURLConnection.HTTP_OK);
+ return successOp;
+ }
+ throw e;
+ } catch (AzureBlobFileSystemException e) {
+ // Any server side issue will be returned as AbfsRestOperationException
and will be handled above.
+ LOG.debug(
+ "Append request failed with non server issues for path: {}, offset:
{}, position: {}",
+ path, reqParams.getoffset(), reqParams.getPosition());
+ throw e;
+ }
+
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">
+ * Path - Update</a>.
+ * Flush previously uploaded data to a file.
+ * @param path on which data has to be flushed.
+ * @param position to which data has to be flushed.
+ * @param retainUncommittedData whether to retain uncommitted data after
flush.
+ * @param isClose specify if this is the last flush to the file.
+ * @param cachedSasToken to be used for the authenticating operation.
+ * @param leaseId if there is an active lease on the path.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation flush(final String path,
+ final long position,
+ boolean retainUncommittedData,
+ boolean isClose,
+ final String cachedSasToken,
+ final String leaseId,
+ ContextEncryptionAdapter contextEncryptionAdapter,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ addEncryptionKeyRequestHeaders(path, requestHeaders, false,
+ contextEncryptionAdapter, tracingContext);
+ // JDK7 does not support PATCH, so to workaround the issue we will use
+ // PUT and specify the real method in the X-Http-Method-Override header.
+ requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
+ if (leaseId != null) {
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+ }
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION,
Long.toString(position));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA,
+ String.valueOf(retainUncommittedData));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
+ // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
+ String sasTokenForReuse = appendSASTokenToQuery(path,
+ SASTokenProvider.WRITE_OPERATION,
+ abfsUriQueryBuilder, cachedSasToken);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.Flush,
+ HTTP_METHOD_PUT, url, requestHeaders,
+ sasTokenForReuse);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ @Override
+ public AbfsRestOperation flush(byte[] buffer,
+ final String path,
+ boolean isClose,
+ final String cachedSasToken,
+ final String leaseId,
+ final String eTag,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException
{
+ throw new UnsupportedOperationException(
+ "Flush with blockIds not supported on DFS Endpoint");
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">
+ * Path - Update</a>.
+ * Set the properties of a file or directory.
+ * @param path on which properties have to be set.
+ * @param properties comma separated list of metadata key-value pairs.
+ * @param tracingContext for tracing the server calls.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation setPathProperties(final String path,
+ final Hashtable<String, String> properties,
+ final TracingContext tracingContext,
+ final ContextEncryptionAdapter contextEncryptionAdapter)
+ throws AzureBlobFileSystemException {
+ final String commaSeparatedProperties;
+ try {
+ commaSeparatedProperties =
convertXmsPropertiesToCommaSeparatedString(properties);
+ } catch (CharacterCodingException ex) {
+ throw new InvalidAbfsRestOperationException(ex);
+ }
+
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ addEncryptionKeyRequestHeaders(path, requestHeaders, false,
+ contextEncryptionAdapter, tracingContext);
+ // JDK7 does not support PATCH, so to workaround the issue we will use
+ // PUT and specify the real method in the X-Http-Method-Override header.
+ requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES,
commaSeparatedProperties));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_PROPERTIES_ACTION);
+ appendSASTokenToQuery(path, SASTokenProvider.SET_PROPERTIES_OPERATION,
+ abfsUriQueryBuilder);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.SetPathProperties,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/get-properties">
+ * Path - Get Properties</a>.
+ * Get the properties of a file or directory.
+ * @param path of which properties have to be fetched.
+ * @param includeProperties to include user defined properties.
+ * @param tracingContext for tracing the server calls.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation getPathStatus(final String path,
+ final boolean includeProperties,
+ final TracingContext tracingContext,
+ final ContextEncryptionAdapter contextEncryptionAdapter)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ String operation = SASTokenProvider.GET_PROPERTIES_OPERATION;
+ if (!includeProperties) {
+ // The default action (operation) is implicitly to get properties and
this action requires read permission
+ // because it reads user defined properties. If the action is getStatus
or getAclStatus, then
+ // only traversal (execute) permission is required.
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, GET_STATUS);
+ operation = SASTokenProvider.GET_STATUS_OPERATION;
+ } else {
+ addEncryptionKeyRequestHeaders(path, requestHeaders, false,
+ contextEncryptionAdapter, tracingContext);
+ }
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN,
+ String.valueOf(getAbfsConfiguration().isUpnUsed()));
+ appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.GetPathStatus,
+ HTTP_METHOD_HEAD, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/read">
+ * Path - Read</a>.
+ * Read the contents of the file at specified path
+ * @param path of the file to be read.
+ * @param position in the file from where data has to be read.
+ * @param buffer to store the data read.
+ * @param bufferOffset offset in the buffer to start storing the data.
+ * @param bufferLength length of data to be read.
+ * @param eTag to specify conditional headers.
+ * @param cachedSasToken to be used for the authenticating operation.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation read(final String path,
+ final long position,
+ final byte[] buffer,
+ final int bufferOffset,
+ final int bufferLength,
+ final String eTag,
+ String cachedSasToken,
+ ContextEncryptionAdapter contextEncryptionAdapter,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ addEncryptionKeyRequestHeaders(path, requestHeaders, false,
+ contextEncryptionAdapter, tracingContext);
+ AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE,
+ String.format("bytes=%d-%d", position, position + bufferLength - 1));
+ requestHeaders.add(rangeHeader);
+ requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
+
+ // Add request header to fetch MD5 Hash of data returned by server.
+ if (isChecksumValidationEnabled(requestHeaders, rangeHeader,
bufferLength)) {
+ requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE));
+ }
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
+ String sasTokenForReuse = appendSASTokenToQuery(path,
+ SASTokenProvider.READ_OPERATION,
+ abfsUriQueryBuilder, cachedSasToken);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.ReadFile,
+ HTTP_METHOD_GET, url, requestHeaders,
+ buffer, bufferOffset, bufferLength,
+ sasTokenForReuse);
+ op.execute(tracingContext);
+
+ // Verify the MD5 hash returned by server holds valid on the data received.
+ if (isChecksumValidationEnabled(requestHeaders, rangeHeader,
bufferLength)) {
+ verifyCheckSumForRead(buffer, op.getResult(), bufferOffset);
+ }
+
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/delete">
+ * Path - Delete</a>.
+ * Delete the file or directory at specified path.
+ * @param path to be deleted.
+ * @param recursive if the path is a directory, delete recursively.
+ * @param continuation to specify continuation token.
+ * @param tracingContext for tracing the server calls.
+ * @param isNamespaceEnabled specify if the namespace is enabled.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation deletePath(final String path,
+ final boolean recursive,
+ final String continuation,
+ TracingContext tracingContext,
+ final boolean isNamespaceEnabled) throws AzureBlobFileSystemException {
+ /*
+ * If Pagination is enabled and current API version is old,
+ * use the minimum required version for pagination.
+ * If Pagination is enabled and current API version is later than minimum
required
+ * version for pagination, use current version only as azure service is
backward compatible.
+ * If pagination is disabled, use the current API version only.
+ */
+ final List<AbfsHttpHeader> requestHeaders = (isPaginatedDelete(recursive,
+ isNamespaceEnabled) && getxMsVersion().compareTo(
+ ApiVersion.AUG_03_2023) < 0)
+ ? createDefaultHeaders(ApiVersion.AUG_03_2023)
+ : createDefaultHeaders();
+ final AbfsUriQueryBuilder abfsUriQueryBuilder
+ = createDefaultUriQueryBuilder();
+
+ if (isPaginatedDelete(recursive, isNamespaceEnabled)) {
+ // Add paginated query parameter
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_PAGINATED, TRUE);
+ }
+
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE,
+ String.valueOf(recursive));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
+ String operation = recursive
+ ? SASTokenProvider.DELETE_RECURSIVE_OPERATION
+ : SASTokenProvider.DELETE_OPERATION;
+ appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.DeletePath, this,
+ HTTP_METHOD_DELETE, url, requestHeaders, getAbfsConfiguration());
+ try {
+ op.execute(tracingContext);
+ } catch (AzureBlobFileSystemException e) {
+ // If we have no HTTP response, throw the original exception.
+ if (!op.hasResult()) {
+ throw e;
+ }
+ final AbfsRestOperation idempotencyOp = deleteIdempotencyCheckOp(op);
+ if (idempotencyOp.getResult().getStatusCode()
+ == op.getResult().getStatusCode()) {
+ // idempotency did not return different result
+ // throw back the exception
+ throw e;
+ } else {
+ return idempotencyOp;
+ }
+ }
+
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">
+ * Path - Update</a>.
+ * @param path on which owner has to be set.
+ * @param owner to be set.
+ * @param group to be set.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation setOwner(final String path,
+ final String owner,
+ final String group,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ // JDK7 does not support PATCH, so to workaround the issue we will use
+ // PUT and specify the real method in the X-Http-Method-Override header.
+ requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
+ if (owner != null && !owner.isEmpty()) {
+ requestHeaders.add(new
AbfsHttpHeader(HttpHeaderConfigurations.X_MS_OWNER, owner));
+ }
+ if (group != null && !group.isEmpty()) {
+ requestHeaders.add(new
AbfsHttpHeader(HttpHeaderConfigurations.X_MS_GROUP, group));
+ }
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_ACCESS_CONTROL);
+ appendSASTokenToQuery(path, SASTokenProvider.SET_OWNER_OPERATION,
+ abfsUriQueryBuilder);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.SetOwner,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">
+ * Path - Update</a>.
+ * @param path on which permission has to be set.
+ * @param permission to be set.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation setPermission(final String path,
+ final String permission,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ // JDK7 does not support PATCH, so to workaround the issue we will use
+ // PUT and specify the real method in the X-Http-Method-Override header.
+ requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
+ requestHeaders.add(new AbfsHttpHeader(
+ HttpHeaderConfigurations.X_MS_PERMISSIONS, permission));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_ACCESS_CONTROL);
+ appendSASTokenToQuery(path, SASTokenProvider.SET_PERMISSION_OPERATION,
+ abfsUriQueryBuilder);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.SetPermissions,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">
+ * Path - Update</a>.
+ * @param path on which ACL has to be set.
+ * @param aclSpecString to be set.
+ * @param eTag to specify conditional headers. Set only if etag matches.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation setAcl(final String path,
+ final String aclSpecString,
+ final String eTag,
+ TracingContext tracingContext)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ // JDK7 does not support PATCH, so to workaround the issue we will use
+ // PUT and specify the real method in the X-Http-Method-Override header.
+ requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
+ HTTP_METHOD_PATCH));
+ requestHeaders.add(
+ new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_ACL, aclSpecString));
+ if (eTag != null && !eTag.isEmpty()) {
+ requestHeaders.add(
+ new AbfsHttpHeader(IF_MATCH, eTag));
+ }
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder
+ = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION,
+ SET_ACCESS_CONTROL);
+ appendSASTokenToQuery(path, SASTokenProvider.SET_ACL_OPERATION,
+ abfsUriQueryBuilder);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.SetAcl,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/get-properties">
+ * Path - Get Properties</a>.
+ * Retrieves the properties of blob at specified path.
+ * @param path of which properties have to be fetched.
+ * @param useUPN whether to use UPN with rest operation.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation getAclStatus(final String path,
+ final boolean useUPN,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, GET_ACCESS_CONTROL);
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN,
+ String.valueOf(useUPN));
+ appendSASTokenToQuery(path, SASTokenProvider.GET_ACL_OPERATION,
+ abfsUriQueryBuilder);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.GetAcl,
+ HTTP_METHOD_HEAD, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/get-properties">
+ * Path - Get Properties</a>.
+ * @param path Path for which access check needs to be performed
+ * @param rwx The permission to be checked on the path
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation checkAccess(String path,
+ String rwx,
+ TracingContext tracingContext)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, CHECK_ACCESS);
+ abfsUriQueryBuilder.addQuery(QUERY_FS_ACTION, rwx);
+ appendSASTokenToQuery(path, SASTokenProvider.CHECK_ACCESS_OPERATION,
+ abfsUriQueryBuilder);
+
+ URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.CheckAccess,
+ HTTP_METHOD_HEAD, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Checks if the rest operation results indicate if the path is a directory.
+ * @param result executed rest operation containing response from server.
+ * @return True if the path is a directory, False otherwise.
+ */
+ @Override
+ public boolean checkIsDir(AbfsHttpOperation result) {
+ String resourceType = result.getResponseHeader(
+ HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+ return resourceType != null
Review Comment:
Nice suggestion.
Taken
> ABFS: [FnsOverBlob]Making AbfsClient Abstract for supporting both DFS and
> Blob Endpoint
> ---------------------------------------------------------------------------------------
>
> Key: HADOOP-19187
> URL: https://issues.apache.org/jira/browse/HADOOP-19187
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.0
> Reporter: Anuj Modi
> Assignee: Anuj Modi
> Priority: Major
> Labels: pull-request-available
>
> Azure Services support two different set of APIs.
> Blob:
> [https://learn.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api]
>
> DFS:
> [https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/operation-groups]
>
> As per the plan in HADOOP-19179, this task enables ABFS Driver to work with
> both set of APIs as per the requirement.
> Scope of this task is to refactor the ABfsClient so that ABFSStore can choose
> to interact with the client it wants based on the endpoint configured by user.
> The blob endpoint support will remain "Unsupported" until the whole code is
> checked-in and well tested.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]