[
https://issues.apache.org/jira/browse/HADOOP-19232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17911359#comment-17911359
]
ASF GitHub Bot commented on HADOOP-19232:
-----------------------------------------
bhattmanish98 commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1908246055
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -51,6 +51,8 @@ public final class AbfsHttpConstants {
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String LIST = "list";
+ public static final String BLOCK_BLOB_TYPE = "BlockBlob";
Review Comment:
BlockBlob should be in camel case.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -51,6 +51,8 @@ public final class AbfsHttpConstants {
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String LIST = "list";
+ public static final String BLOCK_BLOB_TYPE = "BlockBlob";
+ public static final String APPEND_BLOCK = "appendblock";
Review Comment:
Same as above: appendBlock
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -786,18 +1016,165 @@ public String toString() {
return sb.toString();
}
+ /**
+ * Gets the reference to the file system back.
+ *
+ * @return The back reference to the file system.
+ */
@VisibleForTesting
BackReference getFsBackRef() {
return fsBackRef;
}
+ /**
+ * Gets the executor service used for asynchronous operations.
+ *
+ * @return The executor service.
+ */
@VisibleForTesting
ListeningExecutorService getExecutorService() {
return executorService;
}
+ /**
+ * Gets the Azure Blob Storage client.
+ *
+ * @return The Azure Blob Storage client.
+ */
@VisibleForTesting
- AbfsClient getClient() {
+ synchronized AbfsClient getClient() {
Review Comment:
What is the need of this change?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -376,31 +576,15 @@ private void failureWhileSubmit(Exception ex) throws
IOException {
throw lastError;
}
- /**
- * Synchronized accessor to the active block.
- *
- * @return the active block; null if there isn't one.
- */
- private synchronized DataBlocks.DataBlock getActiveBlock() {
- return activeBlock;
- }
-
- /**
- * Predicate to query whether or not there is an active block.
- *
- * @return true if there is an active block.
- */
- private synchronized boolean hasActiveBlock() {
- return activeBlock != null;
- }
-
/**
* Is there an active block and is there any data in it to upload?
*
* @return true if there is some data to upload in an active block else
false.
*/
- private boolean hasActiveBlockDataToUpload() {
- return hasActiveBlock() && getActiveBlock().hasData();
+ boolean hasActiveBlockDataToUpload() {
+ AzureBlockManager blockManager = getBlockManager();
+ AbfsBlock activeBlock = blockManager.getActiveBlock();
+ return blockManager.hasActiveBlock() && activeBlock.hasData();
Review Comment:
Can blockManager and activeBlock be null in any of the case?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -1140,7 +1387,7 @@ public boolean checkIsDir(AbfsHttpOperation result) {
public boolean checkUserError(int responseStatusCode) {
return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
&& responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR
- && responseStatusCode != HttpURLConnection.HTTP_CONFLICT);
+ && responseStatusCode != HTTP_CONFLICT);
Review Comment:
We can do same changes for above two codes (HTTP_INTERNAL_ERROR,
HTTP_BAD_REQUEST) as well.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlock.java:
##########
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.store.DataBlocks;
+
+/**
+ * Return activeBlock with blockId.
+ */
+public class AbfsBlock implements Closeable {
+
+ private final DataBlocks.DataBlock activeBlock;
+ protected AbfsOutputStream outputStream;
+ private final long offset;
+ private BlockEntry blockEntry;
+
+ /**
+ * Gets the activeBlock and the blockId.
+ * @param outputStream AbfsOutputStream Instance.
+ * @param offset Used to generate blockId based on offset.
+ * @throws IOException
+ */
+ AbfsBlock(AbfsOutputStream outputStream, long offset) throws IOException {
+ this.outputStream = outputStream;
+ this.offset = offset;
+ DataBlocks.BlockFactory blockFactory =
outputStream.getBlockManager().getBlockFactory();
+ long blockCount = outputStream.getBlockManager().getBlockCount();
Review Comment:
Since we are using these variables (blockCount, blockSize) only at one
place, it would be better to call it inplace.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java:
##########
@@ -43,11 +45,18 @@ public final class AbfsErrors {
public static final String ERR_LEASE_DID_NOT_MATCH = "The lease ID specified
did not match the "
+ "lease ID for the resource with the specified lease operation";
public static final String ERR_LEASE_BROKEN = "The lease ID matched, but the
lease has been "
- + "broken explicitly and cannot be renewed";
+ + "broken explicitly and cannot be renewed";
public static final String ERR_LEASE_FUTURE_EXISTS = "There is already an
existing lease "
+ "operation";
public static final String ERR_NO_LEASE_THREADS = "Lease desired but no
lease threads "
+ "configured, set " + FS_AZURE_LEASE_THREADS;
public static final String ERR_CREATE_ON_ROOT = "Cannot create file over
root path";
+ public static final String PATH_EXISTS = "The specified path, or an element
of the path, "
+ + "exists and its resource type is invalid for this operation.";
+ public static final String BLOB_OPERATION_NOT_SUPPORTED = "Blob operation is
not supported.";
+ public static final String INVALID_APPEND_OPERATION = "The resource was
created or modified by the Azure Blob Service API "
+ + "and cannot be appended to by the Azure Data Lake Storage Service API";
Review Comment:
Typo error: `and cannot be appended by the Azure Data Lake Storage Service
API`.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+
+/**
+ * Manages Azure Blob blocks for append operations.
+ */
+public class AzureBlobBlockManager extends AzureBlockManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsOutputStream.class);
+
+
+ /** The list of already committed blocks is stored in this list. */
+ private List<String> committedBlockEntries = new ArrayList<>();
+
+ /** The list to store blockId, position, and status. */
+ private final LinkedList<BlockEntry> blockEntryList = new LinkedList<>();
+
+
+ /**
+ * Constructs an AzureBlobBlockManager.
+ *
+ * @param abfsOutputStream the output stream
+ * @param blockFactory the block factory
+ * @param bufferSize the buffer size
+ * @throws AzureBlobFileSystemException if an error occurs
+ */
+ public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
+ DataBlocks.BlockFactory blockFactory,
+ int bufferSize)
+ throws AzureBlobFileSystemException {
+ super(abfsOutputStream, blockFactory, bufferSize);
+ if (abfsOutputStream.getPosition() > 0 &&
!abfsOutputStream.isAppendBlob()) {
+ this.committedBlockEntries =
getBlockList(abfsOutputStream.getTracingContext());
+ }
+ LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance
{} for path {}",
+ abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
+ }
+
+ /**
+ * Creates a new block.
+ *
+ * @param position the position
+ * @return the created block
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ protected synchronized AbfsBlock createBlockInternal(long position)
+ throws IOException {
+ if (activeBlock == null) {
+ blockCount++;
+ activeBlock = new AbfsBlobBlock(abfsOutputStream, position);
+ activeBlock.setBlockEntry(addNewEntry(activeBlock.getBlockId(),
activeBlock.getOffset()));
+ }
+ return activeBlock;
+ }
+
+ /**
+ * Returns block id's which are committed for the blob.
+ *
+ * @param tracingContext Tracing context object.
+ * @return list of committed block id's.
+ * @throws AzureBlobFileSystemException if an error occurs
+ */
+ private List<String> getBlockList(TracingContext tracingContext)
+ throws AzureBlobFileSystemException {
+ List<String> committedBlockIdList;
+ AbfsBlobClient blobClient =
abfsOutputStream.getClientHandler().getBlobClient();
+ final AbfsRestOperation op = blobClient
+ .getBlockList(abfsOutputStream.getPath(), tracingContext);
+ committedBlockIdList = op.getResult().getBlockIdList();
Review Comment:
A check on `op` and `op.getResult` before calling getBlockIdList is needed.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -59,7 +59,7 @@ public abstract class AbfsHttpOperation implements
AbfsPerfLoggable {
private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024;
- private static final int ONE_THOUSAND = 1000;
+ public static final int ONE_THOUSAND = 1000;
Review Comment:
Any reason for making it public?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -522,13 +716,21 @@ public synchronized void close() throws IOException {
bufferIndex = 0;
closed = true;
writeOperations.clear();
- if (hasActiveBlock()) {
- clearActiveBlock();
- }
+ getBlockManager().clearActiveBlock();
Review Comment:
Should we not have check on hasActiveBlock before clearActiveBlock?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java:
##########
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.BlobAppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+import org.apache.hadoop.io.IOUtils;
+
+public class AzureBlobIngressHandler extends AzureIngressHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsOutputStream.class);
+
+ private volatile String eTag;
+
+ private final AzureBlobBlockManager blobBlockManager;
+
+ private final AbfsBlobClient blobClient;
+
+ private final AbfsClientHandler clientHandler;
+
+ /**
+ * Constructs an AzureBlobIngressHandler.
+ *
+ * @param abfsOutputStream the AbfsOutputStream.
+ * @param blockFactory the block factory.
+ * @param bufferSize the buffer size.
+ * @param eTag the eTag.
+ * @param clientHandler the client handler.
+ * @param blockManager the block manager.
+ * @throws AzureBlobFileSystemException if an error occurs.
+ */
+ public AzureBlobIngressHandler(AbfsOutputStream abfsOutputStream,
+ DataBlocks.BlockFactory blockFactory,
+ int bufferSize, String eTag, AbfsClientHandler clientHandler,
AzureBlockManager blockManager)
+ throws AzureBlobFileSystemException {
+ super(abfsOutputStream);
+ this.eTag = eTag;
+ if (blockManager instanceof AzureBlobBlockManager) {
+ this.blobBlockManager = (AzureBlobBlockManager) blockManager;
+ } else {
+ this.blobBlockManager = new AzureBlobBlockManager(this.abfsOutputStream,
+ blockFactory, bufferSize);
+ }
+ this.clientHandler = clientHandler;
+ this.blobClient = clientHandler.getBlobClient();
+ LOG.trace("Created a new BlobIngress Handler for AbfsOutputStream instance
{} for path {}",
+ abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
+ }
+
+ /**
+ * Buffers data into the specified block.
+ *
+ * @param block the block to buffer data into.
+ * @param data the data to be buffered.
+ * @param off the start offset in the data.
+ * @param length the number of bytes to buffer.
+ * @return the number of bytes buffered.
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ protected int bufferData(AbfsBlock block,
+ final byte[] data,
+ final int off,
+ final int length)
+ throws IOException {
+ LOG.trace("Buffering data of length {} to block at offset {}", length,
off);
+ return block.write(data, off, length);
+ }
+
+ /**
+ * Performs a remote write operation.
+ *
+ * @param blockToUpload the block to upload.
+ * @param uploadData the data to upload.
+ * @param reqParams the request parameters.
+ * @param tracingContext the tracing context.
+ * @return the resulting AbfsRestOperation.
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
+ DataBlocks.BlockUploadData uploadData,
+ AppendRequestParameters reqParams,
+ TracingContext tracingContext)
+ throws IOException {
+ BlobAppendRequestParameters blobParams = new
BlobAppendRequestParameters(blockToUpload.getBlockId(), getETag());
+ reqParams.setBlobParams(blobParams);
+ AbfsRestOperation op;
+ long threadId = Thread.currentThread().getId();
Review Comment:
Can we get Thread.currentThread().getId() inplace as we are only using it
once?
> ABFS: [FnsOverBlob] Implementing Ingress Support with various Fallback
> Handling
> -------------------------------------------------------------------------------
>
> Key: HADOOP-19232
> URL: https://issues.apache.org/jira/browse/HADOOP-19232
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.0
> Reporter: Descifrado
> Assignee: Anmol Asrani
> Priority: Major
> Labels: pull-request-available
>
> Scope of this task is to refactor the AbfsOutputStream class to handle the
> ingress for DFS and Blob endpoint effectively.
> More details will be added soon.
> Perquisites for this Patch:
> 1. [HADOOP-19187] ABFS: [FnsOverBlob]Making AbfsClient Abstract for
> supporting both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. [HADOOP-19226] ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. [HADOOP-19207] ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs
> and Metadata APIs - ASF JIRA (apache.org)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]