[
https://issues.apache.org/jira/browse/HADOOP-19226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17882939#comment-17882939
]
ASF GitHub Bot commented on HADOOP-19226:
-----------------------------------------
saikatroy038 commented on code in PR #6944:
URL: https://github.com/apache/hadoop/pull/6944#discussion_r1766368969
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -405,7 +405,7 @@ private synchronized boolean
getNamespaceEnabledInformationFromServer(
}
try {
LOG.debug("Get root ACL status");
- getClient().getAclStatus(AbfsHttpConstants.ROOT_PATH, tracingContext);
+ getClient(AbfsServiceType.DFS).getAclStatus(AbfsHttpConstants.ROOT_PATH,
tracingContext);
Review Comment:
Why DFS client?
In case of FNS, can't we just catch the UnsupportedOperationException and
set/return isNamespaceEnabled to false?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -0,0 +1,1087 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
+import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_JSON;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_OCTET_STREAM;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_XML;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCK;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCKLIST;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCK_TYPE_COMMITTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BREAK_LEASE_ACTION;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CONTAINER;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_LEASE_BREAK_PERIOD;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LEASE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.METADATA;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RELEASE_LEASE_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RENEW_LEASE_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_UNICODE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_MD5;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_NONE_MATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_CONTENT_MD5;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_BREAK_PERIOD;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_DURATION;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ID;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_METADATA_PREFIX;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_META_HDI_ISFOLDER;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RANGE_GET_CONTENT_MD5;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SOURCE_LEASE_ID;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOCKID;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOCKLISTTYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESTYPE;
+
+/**
+ * AbfsClient interacting with Blob endpoint.
+ */
+public class AbfsBlobClient extends AbfsClient {
+
+ public AbfsBlobClient(final URL baseUrl,
+ final SharedKeyCredentials sharedKeyCredentials,
+ final AbfsConfiguration abfsConfiguration,
+ final AccessTokenProvider tokenProvider,
+ final EncryptionContextProvider encryptionContextProvider,
+ final AbfsClientContext abfsClientContext) throws IOException {
+ super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
+ encryptionContextProvider, abfsClientContext);
+ }
+
+ public AbfsBlobClient(final URL baseUrl,
+ final SharedKeyCredentials sharedKeyCredentials,
+ final AbfsConfiguration abfsConfiguration,
+ final SASTokenProvider sasTokenProvider,
+ final EncryptionContextProvider encryptionContextProvider,
+ final AbfsClientContext abfsClientContext) throws IOException {
+ super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
+ encryptionContextProvider, abfsClientContext);
+ }
+
+ /**
+ * Create request headers for Rest Operation using the default API version.
+ * @return default request headers.
+ */
+ @Override
+ public List<AbfsHttpHeader> createDefaultHeaders() {
+ return this.createDefaultHeaders(getxMsVersion());
+ }
+
+ /**
+ * Create request headers for Rest Operation using the specified API version.
+ * Blob Endpoint API responses are in JSON/XML format.
+ * @param xMsVersion API version to be used.
+ * @return default request headers
+ */
+ @Override
+ public List<AbfsHttpHeader> createDefaultHeaders(ApiVersion xMsVersion) {
+ List<AbfsHttpHeader> requestHeaders =
super.createCommonHeaders(xMsVersion);
+ requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON
+ + COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM
+ + COMMA + SINGLE_WHITE_SPACE + APPLICATION_XML));
+ return requestHeaders;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/create-container">
+ * Create Container</a>.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation createFilesystem(TracingContext tracingContext)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.CreateContainer,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/set-container-metadata">
+ * Set Container Metadata</a>.
+ * @param properties comma separated list of metadata key-value pairs.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation setFilesystemProperties(final Hashtable<String,
String> properties,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ /*
+ * Blob Endpoint supports Unicode characters but DFS Endpoint only allow
ASCII.
+ * To match the behavior across endpoints, driver throws exception if
non-ASCII characters are found.
+ */
+ try {
+ List<AbfsHttpHeader> metadataRequestHeaders =
getMetadataHeadersList(properties);
+ requestHeaders.addAll(metadataRequestHeaders);
+ } catch (CharacterCodingException ex) {
+ throw new InvalidAbfsRestOperationException(ex);
+ }
+
+ AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, METADATA);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.SetContainerMetadata,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/get-container-properties">
+ * Get Container Metadata</a>.
+ * Gets all the properties of the filesystem.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ * */
+ @Override
+ public AbfsRestOperation getFilesystemProperties(TracingContext
tracingContext)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.GetContainerProperties,
+ HTTP_METHOD_HEAD, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/delete-container">
+ * Delete Container</a>.
+ * Deletes the Container acting as current filesystem.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.DeleteContainer,
+ HTTP_METHOD_DELETE, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/list-blobs">
+ * List Blobs</a>.
+ * @param relativePath to return only blobs with names that begin with the
specified prefix.
+ * @param recursive to return all blobs in the path, including those in
subdirectories.
+ * @param listMaxResults maximum number of blobs to return.
+ * @param continuation marker to specify the continuation token.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation or response
parsing fails.
+ */
+ @Override
+ public AbfsRestOperation listPath(final String relativePath,
+ final boolean recursive,
+ final int listMaxResults,
+ final String continuation,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ // Todo: [FnsOverBlob] To be implemented as part of response handling of
blob endpoint APIs.
+ throw new NotImplementedException("Blob Endpoint Support is not yet
implemented");
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob">
+ * Put Blob</a>.
+ * Creates a file or directory(marker file) at specified path.
+ * @param path of the directory to be created.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation createPath(final String path,
+ final boolean isFile,
+ final boolean overwrite,
+ final AzureBlobFileSystemStore.Permissions permissions,
+ final boolean isAppendBlob,
+ final String eTag,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException
{
+ // Todo: [FnsOverBlob] To be implemented as part of ingress work over blob
endpoint.
+ throw new NotImplementedException("Create Path operation on Blob endpoint
yet to be implemented.");
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob">
+ * Lease Blob</a>.
+ * @param path on which lease has to be acquired.
+ * @param duration for which lease has to be acquired.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation acquireLease(final String path, final int duration,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION,
ACQUIRE_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION,
Integer.toString(duration)));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID,
UUID.randomUUID().toString()));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeaseBlob,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob">
+ * Lease Blob</a>.
+ * @param path on which lease has to be renewed.
+ * @param leaseId of the lease to be renewed.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation renewLease(final String path, final String leaseId,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION,
RENEW_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeaseBlob,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob">
+ * Lease Blob</a>.
+ * @param path on which lease has to be released.
+ * @param leaseId of the lease to be released.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation releaseLease(final String path, final String
leaseId,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION,
RELEASE_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeaseBlob,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob">
+ * Lease Blob</a>.
+ * @param path on which lease has to be broken.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation breakLease(final String path,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION,
BREAK_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD,
DEFAULT_LEASE_BREAK_PERIOD));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeaseBlob,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ @Override
+ public AbfsClientRenameResult renamePath(final String source,
+ final String destination,
+ final String continuation,
+ final TracingContext tracingContext,
+ final String sourceEtag,
+ final boolean isMetadataIncompleteState,
+ final boolean isNamespaceEnabled) throws IOException {
+ // Todo: [FnsOverBlob] To be implemented as part of rename-delete over
blob endpoint work.
+ throw new NotImplementedException("Rename operation on Blob endpoint yet
to be implemented.");
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/put-block">
+ * Put Block</a>.
+ * Uploads data to be appended to a file.
+ * @param path to which data has to be appended.
+ * @param buffer containing data to be appended.
+ * @param reqParams containing parameters for append operation like offset,
length etc.
+ * @param cachedSasToken to be used for the authenticating operation.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation append(final String path,
+ final byte[] buffer,
+ final AppendRequestParameters reqParams,
+ final String cachedSasToken,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException
{
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ addEncryptionKeyRequestHeaders(path, requestHeaders, false,
+ contextEncryptionAdapter, tracingContext);
+ requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH,
String.valueOf(buffer.length)));
+ requestHeaders.add(new AbfsHttpHeader(IF_MATCH, reqParams.getETag()));
+ if (reqParams.getLeaseId() != null) {
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID,
reqParams.getLeaseId()));
+ }
+ if (reqParams.isExpectHeaderEnabled()) {
+ requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
+ }
+ if (isChecksumValidationEnabled()) {
+ addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer);
+ }
+ if (reqParams.isRetryDueToExpect()) {
+ String userAgentRetry = getUserAgent();
+ userAgentRetry = userAgentRetry.replace(HUNDRED_CONTINUE_USER_AGENT,
EMPTY_STRING);
+ requestHeaders.removeIf(header ->
header.getName().equalsIgnoreCase(USER_AGENT));
+ requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgentRetry));
+ }
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCK);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOCKID, reqParams.getBlockId());
+
+ String sasTokenForReuse = appendSASTokenToQuery(path,
SASTokenProvider.WRITE_OPERATION,
+ abfsUriQueryBuilder, cachedSasToken);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.PutBlock,
+ HTTP_METHOD_PUT, url, requestHeaders,
+ buffer, reqParams.getoffset(), reqParams.getLength(),
+ sasTokenForReuse);
+
+ try {
+ op.execute(tracingContext);
+ } catch (AbfsRestOperationException e) {
+ /*
+ If the http response code indicates a user error we retry
+ the same append request with expect header being disabled.
+ When "100-continue" header is enabled but a non Http 100 response
comes,
+ the response message might not get set correctly by the server.
+ So, this handling is to avoid breaking of backward compatibility
+ if someone has taken dependency on the exception message,
+ which is created using the error string present in the response
header.
+ */
+ int responseStatusCode = ((AbfsRestOperationException)
e).getStatusCode();
+ if (checkUserError(responseStatusCode) &&
reqParams.isExpectHeaderEnabled()) {
+ LOG.debug("User error, retrying without 100 continue enabled for the
given path {}", path);
+ reqParams.setExpectHeaderEnabled(false);
+ reqParams.setRetryDueToExpect(true);
+ return this.append(path, buffer, reqParams, cachedSasToken,
+ contextEncryptionAdapter, tracingContext);
+ }
+ // If we have no HTTP response, throw the original exception.
+ if (!op.hasResult()) {
+ throw e;
+ }
+
+ if (isMd5ChecksumError(e)) {
+ throw new AbfsInvalidChecksumException(e);
+ }
+
+ throw e;
+ }
+ catch (AzureBlobFileSystemException e) {
+ // Any server side issue will be returned as AbfsRestOperationException
and will be handled above.
+ LOG.debug("Append request failed with non server issues for path: {},
offset: {}, position: {}",
+ path, reqParams.getoffset(), reqParams.getPosition());
+ throw e;
+ }
+ return op;
+ }
+
+ /**
+ * Blob Endpoint needs blockIds to flush the data.
+ * This method is not supported on Blob Endpoint.
+ * @param path on which data has to be flushed.
+ * @param position to which data has to be flushed.
+ * @param retainUncommittedData whether to retain uncommitted data after
flush.
+ * @param isClose specify if this is the last flush to the file.
+ * @param cachedSasToken to be used for the authenticating operation.
+ * @param leaseId if there is an active lease on the path.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @param tracingContext for tracing the server calls.
+ * @return exception as this operation is not supported on Blob Endpoint.
+ * @throws UnsupportedOperationException always.
+ */
+ @Override
+ public AbfsRestOperation flush(final String path,
+ final long position,
+ final boolean retainUncommittedData,
+ final boolean isClose,
+ final String cachedSasToken,
+ final String leaseId,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException
{
+ throw new UnsupportedOperationException(
+ "Flush without blockIds not supported on Blob Endpoint");
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list">
+ * Put Block List</a>.
+ * The flush operation to commit the blocks.
+ * @param buffer This has the xml in byte format with the blockIds to be
flushed.
+ * @param path The path to flush the data to.
+ * @param isClose True when the stream is closed.
+ * @param cachedSasToken The cachedSasToken if available.
+ * @param leaseId The leaseId of the blob if available.
+ * @param eTag The etag of the blob.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation flush(byte[] buffer,
+ final String path,
+ boolean isClose,
+ final String cachedSasToken,
+ final String leaseId,
+ final String eTag,
+ ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException
{
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ addEncryptionKeyRequestHeaders(path, requestHeaders, false,
+ contextEncryptionAdapter, tracingContext);
+ requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH,
String.valueOf(buffer.length)));
+ requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, APPLICATION_XML));
+ requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
+ if (leaseId != null) {
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+ }
+ String md5Hash = computeMD5Hash(buffer, 0, buffer.length);
+ requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, md5Hash));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
+ String sasTokenForReuse = appendSASTokenToQuery(path,
SASTokenProvider.WRITE_OPERATION,
+ abfsUriQueryBuilder, cachedSasToken);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.PutBlockList,
+ HTTP_METHOD_PUT, url, requestHeaders,
+ buffer, 0, buffer.length,
+ sasTokenForReuse);
+ try {
+ op.execute(tracingContext);
+ } catch (AbfsRestOperationException ex) {
+ // If 412 Condition Not Met error is seen on retry verify using MD5 hash
that the previous request by the same client might be
Review Comment:
Could you rephrase this for better readability?
> ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob Endpoint for
> AbfsBlobClient
> -----------------------------------------------------------------------------------
>
> Key: HADOOP-19226
> URL: https://issues.apache.org/jira/browse/HADOOP-19226
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.0
> Reporter: Anuj Modi
> Assignee: Anuj Modi
> Priority: Major
> Labels: pull-request-available
>
> This is second task in series of tasks for implementing Blob Endpoint support
> for FNS accounts.
> This patch will have changes to implement all the APIs over Blob Endpoint as
> a part of implementing AbfsBlobClient.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]