This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch fix/adls-blob-soft-delete-support
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 6bbb0ebcd1f8b168b44f027f63710fe5d552df55
Author: Xiang Fu <[email protected]>
AuthorDate: Tue Mar 3 10:48:42 2026 -0800

    Fix ADLS Gen2 plugin to support Azure Blob Soft Delete
    
    The DFS (Data Lake) API endpoint does not support Azure Blob Soft Delete,
    causing 409 EndpointUnsupportedAccountFeatures errors when uploading
    segments to storage accounts with org-mandated Soft Delete policies.
    
    This change adds a BlobContainerClient alongside the existing
    DataLakeFileSystemClient, using the Blob API (*.blob.core.windows.net)
    for file uploads which is fully compatible with Soft Delete. The DFS API
    is retained for read/metadata operations (list, exists, open, etc.) and
    as a write fallback when the BlobContainerClient is not initialized.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 pinot-plugins/pinot-file-system/pinot-adls/pom.xml |  4 +
 .../pinot/plugin/filesystem/ADLSGen2PinotFS.java   | 94 ++++++++++++++++++----
 .../filesystem/test/ADLSGen2PinotFSTest.java       | 64 ++++++++++++++-
 3 files changed, 146 insertions(+), 16 deletions(-)

diff --git a/pinot-plugins/pinot-file-system/pinot-adls/pom.xml 
b/pinot-plugins/pinot-file-system/pinot-adls/pom.xml
index a95142cb367..da0b7e40f56 100644
--- a/pinot-plugins/pinot-file-system/pinot-adls/pom.xml
+++ b/pinot-plugins/pinot-file-system/pinot-adls/pom.xml
@@ -38,6 +38,10 @@
       <groupId>com.azure</groupId>
       <artifactId>azure-storage-file-datalake</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.azure</groupId>
+      <artifactId>azure-storage-blob</artifactId>
+    </dependency>
     <dependency>
       <groupId>com.azure</groupId>
       <artifactId>azure-identity</artifactId>
diff --git 
a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
index 0f294f9103e..f62bf90610a 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
@@ -26,6 +26,12 @@ import com.azure.core.util.Context;
 import com.azure.identity.ClientSecretCredential;
 import com.azure.identity.ClientSecretCredentialBuilder;
 import com.azure.identity.DefaultAzureCredentialBuilder;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobHttpHeaders;
+import com.azure.storage.blob.models.BlobStorageException;
 import com.azure.storage.common.StorageSharedKeyCredential;
 import com.azure.storage.common.Utility;
 import com.azure.storage.file.datalake.DataLakeDirectoryClient;
@@ -110,6 +116,10 @@ public class ADLSGen2PinotFS extends BasePinotFS {
 
   private DataLakeFileSystemClient _fileSystemClient;
 
+  // Blob API client used for file uploads to support Azure storage accounts 
with Blob Soft Delete enabled.
+  // The DFS API (Data Lake) does not support Soft Delete, causing 409 
EndpointUnsupportedAccountFeatures errors.
+  private BlobContainerClient _blobContainerClient;
+
   // If enabled, pinotFS implementation will guarantee that the bits you've 
read are the same as the ones you wrote.
   // However, there's some overhead in computing hash. (Adds roughly 3 seconds 
for 1GB file)
   private boolean _enableChecksum;
@@ -117,10 +127,17 @@ public class ADLSGen2PinotFS extends BasePinotFS {
   public ADLSGen2PinotFS() {
   }
 
+  @VisibleForTesting
   public ADLSGen2PinotFS(DataLakeFileSystemClient fileSystemClient) {
     _fileSystemClient = fileSystemClient;
   }
 
+  @VisibleForTesting
+  public ADLSGen2PinotFS(DataLakeFileSystemClient fileSystemClient, 
BlobContainerClient blobContainerClient) {
+    _fileSystemClient = fileSystemClient;
+    _blobContainerClient = blobContainerClient;
+  }
+
   @Override
   public void init(PinotConfiguration config) {
     _enableChecksum = config.getProperty(ENABLE_CHECKSUM, false);
@@ -145,9 +162,12 @@ public class ADLSGen2PinotFS extends BasePinotFS {
     String sasToken = config.getProperty(SAS_TOKEN);
 
     String dfsServiceEndpointUrl = HTTPS_URL_PREFIX + accountName + 
AZURE_STORAGE_DNS_SUFFIX;
+    String blobServiceEndpointUrl = HTTPS_URL_PREFIX + accountName + 
AZURE_BLOB_DNS_SUFFIX;
 
     DataLakeServiceClientBuilder dataLakeServiceClientBuilder =
         new DataLakeServiceClientBuilder().endpoint(dfsServiceEndpointUrl);
+    BlobServiceClientBuilder blobServiceClientBuilder =
+        new BlobServiceClientBuilder().endpoint(blobServiceEndpointUrl);
 
     switch (authType) {
       case ACCESS_KEY: {
@@ -157,6 +177,7 @@ public class ADLSGen2PinotFS extends BasePinotFS {
 
         StorageSharedKeyCredential sharedKeyCredential = new 
StorageSharedKeyCredential(accountName, accessKey);
         dataLakeServiceClientBuilder.credential(sharedKeyCredential);
+        blobServiceClientBuilder.credential(sharedKeyCredential);
         break;
       }
       case SAS_TOKEN: {
@@ -166,6 +187,7 @@ public class ADLSGen2PinotFS extends BasePinotFS {
 
         AzureSasCredential azureSasCredential = new 
AzureSasCredential(sasToken);
         dataLakeServiceClientBuilder.credential(azureSasCredential);
+        blobServiceClientBuilder.credential(azureSasCredential);
         break;
       }
       case AZURE_AD: {
@@ -178,6 +200,7 @@ public class ADLSGen2PinotFS extends BasePinotFS {
             new 
ClientSecretCredentialBuilder().clientId(clientId).clientSecret(clientSecret).tenantId(tenantId)
                 .build();
         dataLakeServiceClientBuilder.credential(clientSecretCredential);
+        blobServiceClientBuilder.credential(clientSecretCredential);
         break;
       }
       case AZURE_AD_WITH_PROXY: {
@@ -198,7 +221,9 @@ public class ADLSGen2PinotFS extends BasePinotFS {
             new 
ClientSecretCredentialBuilder().clientId(clientId).clientSecret(clientSecret).tenantId(tenantId);
         clientSecretCredentialBuilder.httpClient(builder.build());
 
-        
dataLakeServiceClientBuilder.credential(clientSecretCredentialBuilder.build());
+        ClientSecretCredential credential = 
clientSecretCredentialBuilder.build();
+        dataLakeServiceClientBuilder.credential(credential);
+        blobServiceClientBuilder.credential(credential);
         break;
       }
       case DEFAULT: {
@@ -217,6 +242,7 @@ public class ADLSGen2PinotFS extends BasePinotFS {
           defaultAzureCredentialBuilder.authorityHost(authorityHost);
         }
         
dataLakeServiceClientBuilder.credential(defaultAzureCredentialBuilder.build());
+        
blobServiceClientBuilder.credential(defaultAzureCredentialBuilder.build());
         break;
       }
       case ANONYMOUS_ACCESS: {
@@ -232,8 +258,12 @@ public class ADLSGen2PinotFS extends BasePinotFS {
     DataLakeServiceClient serviceClient = 
dataLakeServiceClientBuilder.buildClient();
     _fileSystemClient = getOrCreateClientWithFileSystem(serviceClient, 
fileSystemName);
 
+    BlobServiceClient blobServiceClient = 
blobServiceClientBuilder.buildClient();
+    _blobContainerClient = 
blobServiceClient.getBlobContainerClient(fileSystemName);
+
     LOGGER.info("ADLSGen2PinotFS is initialized (accountName={}, 
fileSystemName={}, dfsServiceEndpointUrl={}, "
-        + "enableChecksum={})", accountName, fileSystemName, 
dfsServiceEndpointUrl, _enableChecksum);
+        + "blobServiceEndpointUrl={}, enableChecksum={})", accountName, 
fileSystemName, dfsServiceEndpointUrl,
+        blobServiceEndpointUrl, _enableChecksum);
   }
 
   /**
@@ -541,7 +571,7 @@ public class ADLSGen2PinotFS extends BasePinotFS {
     LOGGER.debug("copyFromLocalFile is called with srcFile='{}', dstUri='{}'", 
srcFile, dstUri);
     byte[] contentMd5 = computeContentMd5(srcFile);
     try (InputStream fileInputStream = new FileInputStream(srcFile)) {
-      copyInputStreamToDst(fileInputStream, dstUri, contentMd5);
+      copyInputStreamToDst(fileInputStream, dstUri, contentMd5, 
srcFile.length());
     }
   }
 
@@ -628,34 +658,73 @@ public class ADLSGen2PinotFS extends BasePinotFS {
     PathProperties pathProperties =
         
_fileSystemClient.getFileClient(AzurePinotFSUtil.convertUriToAzureStylePath(srcUri)).getProperties();
     try (InputStream inputStream = open(srcUri)) {
-      return copyInputStreamToDst(inputStream, dstUri, 
pathProperties.getContentMd5());
+      return copyInputStreamToDst(inputStream, dstUri, 
pathProperties.getContentMd5(),
+          pathProperties.getFileSize());
     }
   }
 
   /**
    * Helper function to copy input stream to destination URI.
    *
+   * <p>Uses the Azure Blob API for uploads, which is compatible with storage 
accounts that have Blob Soft Delete
+   * enabled. The DFS (Data Lake) API does not support Soft Delete and will 
fail with 409
+   * EndpointUnsupportedAccountFeatures on such accounts.</p>
+   *
    * NOTE: the caller has to close the input stream.
    *
    * @param inputStream input stream that will be written to dstUri
    * @param dstUri destination URI
+   * @param contentMd5 optional MD5 hash of the content
+   * @param contentLength length of the content in bytes
    * @return true if the copy succeeds
    */
-  private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri, 
byte[] contentMd5)
+  private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri, 
byte[] contentMd5, long contentLength)
+      throws IOException {
+    String path = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri);
+
+    if (_blobContainerClient != null) {
+      return copyInputStreamToDstViaBlob(inputStream, dstUri, path, 
contentMd5, contentLength);
+    }
+    return copyInputStreamToDstViaDfs(inputStream, dstUri, path, contentMd5);
+  }
+
+  /**
+   * Upload via Azure Blob API. Compatible with Blob Soft Delete.
+   */
+  private boolean copyInputStreamToDstViaBlob(InputStream inputStream, URI 
dstUri, String path, byte[] contentMd5,
+      long contentLength)
+      throws IOException {
+    try {
+      BlobClient blobClient = _blobContainerClient.getBlobClient(path);
+      blobClient.upload(inputStream, contentLength, true);
+
+      if (contentMd5 != null) {
+        blobClient.setHttpHeaders(new 
BlobHttpHeaders().setContentMd5(contentMd5));
+      }
+      return true;
+    } catch (BlobStorageException e) {
+      LOGGER.error("Exception thrown while uploading to destination via Blob 
API (dstUri={}, errorStatus={})", dstUri,
+          e.getStatusCode(), e);
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Upload via DFS (Data Lake) API. Does NOT support Blob Soft Delete.
+   * Kept as fallback for backward compatibility when BlobContainerClient is 
not initialized.
+   */
+  private boolean copyInputStreamToDstViaDfs(InputStream inputStream, URI 
dstUri, String path, byte[] contentMd5)
       throws IOException {
     int bytesRead;
     long totalBytesRead = 0;
     byte[] buffer = new byte[BUFFER_SIZE];
-    // TODO: the newer client now has the API 'uploadFromFile' that directly 
takes the file as an input. We can replace
-    // this upload logic with the 'uploadFromFile'/
     DataLakeFileClient fileClient;
     try {
-      fileClient = 
_fileSystemClient.createFile(AzurePinotFSUtil.convertUriToAzureStylePath(dstUri));
+      fileClient = _fileSystemClient.createFile(path);
     } catch (DataLakeStorageException e) {
-      // If the path already exists, doing nothing and return true
       if (e.getStatusCode() == ALREADY_EXISTS_STATUS_CODE && 
e.getErrorCode().equals(PATH_ALREADY_EXISTS_ERROR_CODE)) {
         LOGGER.info("The destination path already exists and we are 
overwriting the file (dstUri={})", dstUri);
-        fileClient = 
_fileSystemClient.createFile(AzurePinotFSUtil.convertUriToAzureStylePath(dstUri),
 true);
+        fileClient = _fileSystemClient.createFile(path, true);
       } else {
         LOGGER.error("Exception thrown while calling copy stream to 
destination (dstUri={}, errorStatus ={})", dstUri,
             e.getStatusCode(), e);
@@ -663,7 +732,6 @@ public class ADLSGen2PinotFS extends BasePinotFS {
       }
     }
 
-    // Update MD5 metadata
     if (contentMd5 != null) {
       PathHttpHeaders pathHttpHeaders = 
getPathHttpHeaders(fileClient.getProperties());
       pathHttpHeaders.setContentMd5(contentMd5);
@@ -674,21 +742,17 @@ public class ADLSGen2PinotFS extends BasePinotFS {
       while ((bytesRead = inputStream.read(buffer)) != -1) {
         byte[] md5BlockHash = null;
         if (_enableChecksum) {
-          // Compute md5 for the current block
           MessageDigest md5Block = MessageDigest.getInstance("MD5");
           md5Block.update(buffer, 0, bytesRead);
           md5BlockHash = md5Block.digest();
         }
-        // Upload 4MB at a time since Azure's limit for each append call is 
4MB.
         ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(buffer, 0, bytesRead);
         fileClient.appendWithResponse(byteArrayInputStream, totalBytesRead, 
bytesRead, md5BlockHash, null, null,
             Context.NONE);
         byteArrayInputStream.close();
         totalBytesRead += bytesRead;
       }
-      // Call flush on ADLS Gen 2
       fileClient.flush(totalBytesRead, true);
-
       return true;
     } catch (DataLakeStorageException | NoSuchAlgorithmException e) {
       throw new IOException(e);
diff --git 
a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
 
b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
index 4202ae7fdfd..05f82181446 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
@@ -21,6 +21,9 @@ package org.apache.pinot.plugin.filesystem.test;
 import com.azure.core.http.rest.PagedIterable;
 import com.azure.core.http.rest.SimpleResponse;
 import com.azure.core.util.Context;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.models.BlobStorageException;
 import com.azure.storage.file.datalake.DataLakeDirectoryClient;
 import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
@@ -88,6 +91,10 @@ public class ADLSGen2PinotFSTest {
   private PagedIterable _mockPagedIterable;
   @Mock
   private PathItem _mockPathItem;
+  @Mock
+  private BlobContainerClient _mockBlobContainerClient;
+  @Mock
+  private BlobClient _mockBlobClient;
 
   private URI _mockURI;
   private ADLSGen2PinotFS _adlsGen2PinotFsUnderTest;
@@ -106,7 +113,7 @@ public class ADLSGen2PinotFSTest {
   public void tearDown() {
     verifyNoMoreInteractions(_mockDataLakeStorageException, 
_mockServiceClient, _mockFileSystemClient,
         _mockSimpleResponse, _mockDirectoryClient, _mockPathItem, 
_mockPagedIterable, _mockPathProperties,
-        _mockFileClient, _mockFileOpenInputStreamResult, _mockInputStream);
+        _mockFileClient, _mockFileOpenInputStreamResult, _mockInputStream, 
_mockBlobContainerClient, _mockBlobClient);
   }
 
   @Test(expectedExceptions = NullPointerException.class)
@@ -613,4 +620,59 @@ public class ADLSGen2PinotFSTest {
       }
     }
   }
+
+  @Test
+  public void testCopyFromLocalFileViaBlobApi() throws Exception {
+    // Create a test instance with Blob API support
+    ADLSGen2PinotFS blobEnabledFs = new ADLSGen2PinotFS(_mockFileSystemClient, 
_mockBlobContainerClient);
+
+    // Create a temporary file with test data
+    File tempFile = File.createTempFile("pinot_blob_test", ".tmp");
+    byte[] testData = "test segment data".getBytes();
+    Files.write(tempFile.toPath(), testData);
+
+    URI dstUri = new URI("adl2://account/container/test_segment");
+    String expectedPath = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri);
+
+    
when(_mockBlobContainerClient.getBlobClient(expectedPath)).thenReturn(_mockBlobClient);
+    doNothing().when(_mockBlobClient).upload(any(InputStream.class), eq((long) 
testData.length), eq(true));
+
+    try {
+      blobEnabledFs.copyFromLocalFile(tempFile, dstUri);
+
+      verify(_mockBlobContainerClient).getBlobClient(expectedPath);
+      verify(_mockBlobClient).upload(any(InputStream.class), eq((long) 
testData.length), eq(true));
+    } finally {
+      FileUtils.deleteQuietly(tempFile);
+    }
+  }
+
+  @Test
+  public void testCopyFromLocalFileViaBlobApiWithException() throws Exception {
+    // Create a test instance with Blob API support
+    ADLSGen2PinotFS blobEnabledFs = new ADLSGen2PinotFS(_mockFileSystemClient, 
_mockBlobContainerClient);
+
+    // Create a temporary file with test data
+    File tempFile = File.createTempFile("pinot_blob_test", ".tmp");
+    byte[] testData = "test segment data".getBytes();
+    Files.write(tempFile.toPath(), testData);
+
+    URI dstUri = new URI("adl2://account/container/test_segment");
+    String expectedPath = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri);
+
+    BlobStorageException blobException = mock(BlobStorageException.class);
+    when(blobException.getStatusCode()).thenReturn(500);
+    
when(_mockBlobContainerClient.getBlobClient(expectedPath)).thenReturn(_mockBlobClient);
+    
doThrow(blobException).when(_mockBlobClient).upload(any(InputStream.class), 
eq((long) testData.length), eq(true));
+
+    try {
+      expectThrows(IOException.class, () -> 
blobEnabledFs.copyFromLocalFile(tempFile, dstUri));
+
+      verify(_mockBlobContainerClient).getBlobClient(expectedPath);
+      verify(_mockBlobClient).upload(any(InputStream.class), eq((long) 
testData.length), eq(true));
+      verify(blobException).getStatusCode();
+    } finally {
+      FileUtils.deleteQuietly(tempFile);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to