rakeshadr commented on code in PR #6944:
URL: https://github.com/apache/hadoop/pull/6944#discussion_r1766116568
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java:
##########
@@ -84,24 +92,40 @@ public AbfsClient getClient() {
/**
* Get the AbfsClient based on the service type.
- * @param serviceType AbfsServiceType
+ * @param serviceType AbfsServiceType.
* @return AbfsClient
*/
public AbfsClient getClient(AbfsServiceType serviceType) {
- return serviceType == AbfsServiceType.DFS ? dfsAbfsClient : null;
+ return serviceType == AbfsServiceType.DFS ? dfsAbfsClient : blobAbfsClient;
+ }
+
+ /**
+ * Get the AbfsDfsClient.
+ * @return AbfsDfsClient.
+ */
+ public AbfsDfsClient getDfsClient() {
Review Comment:
Can we please remove the unused APIs, later based on the need we can
introduce necessary getters or APIs in that future PRs.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -0,0 +1,1087 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
+import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_JSON;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_OCTET_STREAM;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_XML;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCK;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCKLIST;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCK_TYPE_COMMITTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BREAK_LEASE_ACTION;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CONTAINER;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_LEASE_BREAK_PERIOD;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LEASE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.METADATA;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RELEASE_LEASE_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RENEW_LEASE_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_UNICODE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_MD5;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_NONE_MATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_CONTENT_MD5;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ACTION;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_BREAK_PERIOD;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_DURATION;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ID;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_METADATA_PREFIX;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_META_HDI_ISFOLDER;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RANGE_GET_CONTENT_MD5;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SOURCE_LEASE_ID;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOCKID;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOCKLISTTYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESTYPE;
+
+/**
+ * AbfsClient interacting with Blob endpoint.
+ */
+public class AbfsBlobClient extends AbfsClient {
+
+ public AbfsBlobClient(final URL baseUrl,
+ final SharedKeyCredentials sharedKeyCredentials,
+ final AbfsConfiguration abfsConfiguration,
+ final AccessTokenProvider tokenProvider,
+ final EncryptionContextProvider encryptionContextProvider,
+ final AbfsClientContext abfsClientContext) throws IOException {
+ super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
+ encryptionContextProvider, abfsClientContext);
+ }
+
+ public AbfsBlobClient(final URL baseUrl,
+ final SharedKeyCredentials sharedKeyCredentials,
+ final AbfsConfiguration abfsConfiguration,
+ final SASTokenProvider sasTokenProvider,
+ final EncryptionContextProvider encryptionContextProvider,
+ final AbfsClientContext abfsClientContext) throws IOException {
+ super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
+ encryptionContextProvider, abfsClientContext);
+ }
+
+ /**
+ * Create request headers for Rest Operation using the default API version.
+ * @return default request headers.
+ */
+ @Override
+ public List<AbfsHttpHeader> createDefaultHeaders() {
+ return this.createDefaultHeaders(getxMsVersion());
+ }
+
+ /**
+ * Create request headers for Rest Operation using the specified API version.
+ * Blob Endpoint API responses are in JSON/XML format.
+ * @param xMsVersion API version to be used.
+ * @return default request headers
+ */
+ @Override
+ public List<AbfsHttpHeader> createDefaultHeaders(ApiVersion xMsVersion) {
+ List<AbfsHttpHeader> requestHeaders =
super.createCommonHeaders(xMsVersion);
+ requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON
+ + COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM
+ + COMMA + SINGLE_WHITE_SPACE + APPLICATION_XML));
+ return requestHeaders;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/create-container">
+ * Create Container</a>.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation createFilesystem(TracingContext tracingContext)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.CreateContainer,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/set-container-metadata">
+ * Set Container Metadata</a>.
+ * @param properties comma separated list of metadata key-value pairs.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation setFilesystemProperties(final Hashtable<String,
String> properties,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ /*
+ * Blob Endpoint supports Unicode characters but DFS Endpoint only allow
ASCII.
+ * To match the behavior across endpoints, driver throws exception if
non-ASCII characters are found.
+ */
+ try {
+ List<AbfsHttpHeader> metadataRequestHeaders =
getMetadataHeadersList(properties);
+ requestHeaders.addAll(metadataRequestHeaders);
+ } catch (CharacterCodingException ex) {
+ throw new InvalidAbfsRestOperationException(ex);
+ }
+
+ AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, METADATA);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.SetContainerMetadata,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/get-container-properties">
+ * Get Container Metadata</a>.
+ * Gets all the properties of the filesystem.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ * */
+ @Override
+ public AbfsRestOperation getFilesystemProperties(TracingContext
tracingContext)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.GetContainerProperties,
+ HTTP_METHOD_HEAD, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/delete-container">
+ * Delete Container</a>.
+ * Deletes the Container acting as current filesystem.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.DeleteContainer,
+ HTTP_METHOD_DELETE, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/list-blobs">
+ * List Blobs</a>.
+ * @param relativePath to return only blobs with names that begin with the
specified prefix.
+ * @param recursive to return all blobs in the path, including those in
subdirectories.
+ * @param listMaxResults maximum number of blobs to return.
+ * @param continuation marker to specify the continuation token.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation or response
parsing fails.
+ */
+ @Override
+ public AbfsRestOperation listPath(final String relativePath,
+ final boolean recursive,
+ final int listMaxResults,
+ final String continuation,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ // Todo: [FnsOverBlob] To be implemented as part of response handling of
blob endpoint APIs.
+ throw new NotImplementedException("Blob Endpoint Support is not yet
implemented");
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob">
+ * Put Blob</a>.
+ * Creates a file or directory(marker file) at specified path.
+ * @param path of the directory to be created.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation createPath(final String path,
+ final boolean isFile,
+ final boolean overwrite,
+ final AzureBlobFileSystemStore.Permissions permissions,
+ final boolean isAppendBlob,
+ final String eTag,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException
{
+ // Todo: [FnsOverBlob] To be implemented as part of ingress work over blob
endpoint.
+ throw new NotImplementedException("Create Path operation on Blob endpoint
yet to be implemented.");
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob">
+ * Lease Blob</a>.
+ * @param path on which lease has to be acquired.
+ * @param duration for which lease has to be acquired.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation acquireLease(final String path, final int duration,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION,
ACQUIRE_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION,
Integer.toString(duration)));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID,
UUID.randomUUID().toString()));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeaseBlob,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob">
+ * Lease Blob</a>.
+ * @param path on which lease has to be renewed.
+ * @param leaseId of the lease to be renewed.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation renewLease(final String path, final String leaseId,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION,
RENEW_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeaseBlob,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob">
+ * Lease Blob</a>.
+ * @param path on which lease has to be released.
+ * @param leaseId of the lease to be released.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation releaseLease(final String path, final String
leaseId,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION,
RELEASE_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeaseBlob,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob">
+ * Lease Blob</a>.
+ * @param path on which lease has to be broken.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation breakLease(final String path,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION,
BREAK_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD,
DEFAULT_LEASE_BREAK_PERIOD));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeaseBlob,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ @Override
+ public AbfsClientRenameResult renamePath(final String source,
+ final String destination,
+ final String continuation,
+ final TracingContext tracingContext,
+ final String sourceEtag,
+ final boolean isMetadataIncompleteState,
+ final boolean isNamespaceEnabled) throws IOException {
+ // Todo: [FnsOverBlob] To be implemented as part of rename-delete over
blob endpoint work.
+ throw new NotImplementedException("Rename operation on Blob endpoint yet
to be implemented.");
+ }
+
+ /**
+ * Get Rest Operation for API
+ * <a
href="https://learn.microsoft.com/en-us/rest/api/storageservices/put-block">
+ * Put Block</a>.
+ * Uploads data to be appended to a file.
+ * @param path to which data has to be appended.
+ * @param buffer containing data to be appended.
+ * @param reqParams containing parameters for append operation like offset,
length etc.
+ * @param cachedSasToken to be used for the authenticating operation.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation append(final String path,
+ final byte[] buffer,
+ final AppendRequestParameters reqParams,
+ final String cachedSasToken,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException
{
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ addEncryptionKeyRequestHeaders(path, requestHeaders, false,
+ contextEncryptionAdapter, tracingContext);
+ requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH,
String.valueOf(buffer.length)));
+ requestHeaders.add(new AbfsHttpHeader(IF_MATCH, reqParams.getETag()));
+ if (reqParams.getLeaseId() != null) {
Review Comment:
General Comment: I'm wondering about the coverage of this new class, blob
endpoint. We have configs, if-else block, exception block and its good to
improve coverage. It can be even an sub-task, which can be considered
immediately after this PR, before the feature release.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java:
##########
@@ -63,6 +68,9 @@ public AbfsClientHandler(final URL baseUrl,
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
+ this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
+ abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
Review Comment:
Do we need to initialize and instantiate both `dfsAbfsClient` and
`blobAbfsClient` objects?. Are we planning to support dual behavior within a
`AzureBlobFileSystemStore`.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java:
##########
@@ -84,24 +92,40 @@ public AbfsClient getClient() {
/**
* Get the AbfsClient based on the service type.
- * @param serviceType AbfsServiceType
+ * @param serviceType AbfsServiceType.
* @return AbfsClient
*/
public AbfsClient getClient(AbfsServiceType serviceType) {
- return serviceType == AbfsServiceType.DFS ? dfsAbfsClient : null;
+ return serviceType == AbfsServiceType.DFS ? dfsAbfsClient : blobAbfsClient;
+ }
+
+ /**
+ * Get the AbfsDfsClient.
+ * @return AbfsDfsClient.
+ */
+ public AbfsDfsClient getDfsClient() {
+ return dfsAbfsClient;
+ }
+
+ /**
+ * Get the AbfsBlobClient.
+ * @return AbfsBlobClient.
+ */
+ public AbfsBlobClient getBlobClient() {
Review Comment:
same here as well. Please remove it as its unused getter in this PR scope.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClientHandler.java:
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
+
+/**
+ * Test AbfsClientHandler initialization.
+ */
+public class ITestAbfsClientHandler extends AbstractAbfsIntegrationTest {
+
+ public ITestAbfsClientHandler() throws Exception{
Review Comment:
Default const is not required, right
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java:
##########
@@ -37,6 +37,14 @@ public enum Mode {
private boolean isExpectHeaderEnabled;
private boolean isRetryDueToExpect;
+ /*
+ * Following parameters are used by AbfsBlobClient only.
+ * Blob Endpoint Append API requires blockId and eTag to be passed in the
request.
+ */
+ private String blockId;
Review Comment:
How abt adding a new pojo object where we can keep all the related newly
added Blob specific req parameters. Then add that object reference here ?
Basically its for better maintainbility, in future one can add more params.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java:
##########
@@ -45,8 +45,7 @@ public enum FSOperationType {
SET_OWNER("SO"),
SET_ACL("SA"),
TEST_OP("TS"),
- WRITE("WR"),
- INIT("IN");
Review Comment:
Interesting to know the reason for removing INIT?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java:
##########
@@ -165,6 +165,10 @@ public int getRetryCount() {
return retryCount;
}
+ public FSOperationType getOpType() {
Review Comment:
General comment: Please add javadocs for public interfaces in this PR.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java:
##########
@@ -42,4 +42,19 @@ public enum AbfsRestOperationType {
DeletePath,
CheckAccess,
LeasePath,
+ CreateContainer,
Review Comment:
Here also, hw about grouping the enums ?
Global enum
dfs enum
blob enum
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]