This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a94cadae New interface & factory for AzureHTTPClient providers (#1700)
2a94cadae is described below

commit 2a94cadae5e1884c5d65526b9f2251dcb23bda90
Author: Ayush Khandelwal <[email protected]>
AuthorDate: Fri Aug 23 03:40:04 2024 +0530

    New interface & factory for AzureHTTPClient providers (#1700)
    
    * New interface for AzureHTTPClient providers
    
    * Added license headers
    
    * Updated method spec definitions
    
    * Updated spec headers doc
    
    * Updated config property in readme file
    
    * Modified doc of method
    
    * Fixed checkstyle failures
---
 .../versioned/jobs/samza-configurations.md         |  1 +
 .../system/azureblob/AzureBlobClientBuilder.java   |  2 +-
 .../azureblob/AzureBlobClientBuilderFactory.java   | 31 ++++++++++++++++++
 .../samza/system/azureblob/AzureBlobConfig.java    |  7 ++++
 .../samza/system/azureblob/BlobClientBuilder.java  | 38 ++++++++++++++++++++++
 .../system/azureblob/BlobClientBuilderFactory.java | 35 ++++++++++++++++++++
 .../producer/AzureBlobSystemProducer.java          | 14 ++++++--
 7 files changed, 124 insertions(+), 4 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 6188e90b4..9ccb4192e 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -264,6 +264,7 @@ Configs for producing to [Azure Blob 
Storage](https://azure.microsoft.com/en-us/
 |systems.**_system-name_**.azureblob.proxy.hostname| |if proxy.use is true 
then host name of proxy.|
 |systems.**_system-name_**.azureblob.proxy.port| |if proxy.use is true then 
port of proxy.|
 
|systems.**_system-name_**.azureblob.writer.factory.class|`org.apache.samza.system.`<br>`azureblob.avro.`<br>`AzureBlobAvroWriterFactory`|Fully
 qualified class name of the 
`org.apache.samza.system.azureblob.producer.AzureBlobWriter` impl for the 
system producer.<br><br>The default writer creates blobs that are of type AVRO 
and require the messages sent to a blob to be AVRO records. The blobs created 
by the default writer are of type [Block 
Blobs](https://docs.microsoft.com/en-us/rest/api [...]
+|systems.**_system-name_**.azureblob.azureclientbuilder.factory.class | 
`org.apache.samza.system.`<br>`azureblob.AzureBlobClientBuilderFactory`|Fully 
qualified class name of the factory of the 
`org.apache.samza.system.azureblob.BlobClientBuilder` implementation for the 
client builder. |
 |systems.**_system-name_**.azureblob.compression.type|"none"|type of 
compression to be used before uploading blocks. Can be "none" or "gzip".|
 |systems.**_system-name_**.azureblob.maxFlushThresholdSize|10485760 (10 
MB)|max size of the uncompressed block to be uploaded in bytes. Maximum size 
allowed by Azure is 100MB.|
 |systems.**_system-name_**.azureblob.maxBlobSize|Long.MAX_VALUE 
(unlimited)|max size of the uncompressed blob in bytes.<br>If default value 
then size is unlimited capped only by Azure BlockBlob size of 4.75 TB (100 MB 
per block X 50,000 blocks).|
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java
index a8a6eb2df..479733e31 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java
@@ -39,7 +39,7 @@ import java.util.Locale;
  * configs given to the SystemProducer - such as which authentication method 
to use, whether to use proxy to authenticate,
  * and so on.
  */
-public final class AzureBlobClientBuilder {
+public final class AzureBlobClientBuilder implements BlobClientBuilder {
   private final String systemName;
   private final String azureUrlFormat;
   private final AzureBlobConfig azureBlobConfig;
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java
new file mode 100644
index 000000000..7a5120c11
--- /dev/null
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilderFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.azureblob;
+
+/**
+ * Default implementation of {@link BlobClientBuilderFactory} that constructs a
+ * new instance of {@link AzureBlobClientBuilder}.
+ */
+public class AzureBlobClientBuilderFactory implements BlobClientBuilderFactory 
{
+  @Override
+  public BlobClientBuilder getBlobClientBuilder(String systemName, String 
azureUrlFormat,
+      AzureBlobConfig azureBlobConfig) {
+    return new AzureBlobClientBuilder(systemName, azureUrlFormat, 
azureBlobConfig);
+  }
+}
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
index 37c6ebcf3..bedf43370 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
@@ -38,6 +38,8 @@ public class AzureBlobConfig extends MapConfig {
   // fully qualified class name of the AzureBlobWriter impl for the producer 
system
   public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME = 
SYSTEM_AZUREBLOB_PREFIX + "writer.factory.class";
   public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT = 
"org.apache.samza.system.azureblob.avro.AzureBlobAvroWriterFactory";
+  public static final String SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME = 
SYSTEM_AZUREBLOB_PREFIX + "azureclientbuilder.factory.class";
+  public static final String 
SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME_DEFAULT = 
"org.apache.samza.system.azureblob.AzureBlobClientBuilderFactory";
 
   public static final String SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION = 
Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + 
"useTokenCredentialAuthentication";
   private static final boolean 
SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION_DEFAULT = false;
@@ -172,6 +174,11 @@ public class AzureBlobConfig extends MapConfig {
     return get(String.format(SYSTEM_WRITER_FACTORY_CLASS_NAME, systemName), 
SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT);
   }
 
+  public String getAzureBlobClientBuilderFactoryClassName(String systemName) {
+    return get(String.format(SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME, 
systemName),
+        SYSTEM_BLOB_CLIENT_BUILDER_FACTORY_CLASS_NAME_DEFAULT);
+  }
+
   public int getMaxFlushThresholdSize(String systemName) {
     return getInt(String.format(SYSTEM_MAX_FLUSH_THRESHOLD_SIZE, systemName), 
SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT);
   }
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java
new file mode 100644
index 000000000..f76c3d955
--- /dev/null
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.azureblob;
+
+import com.azure.storage.blob.BlobServiceAsyncClient;
+
+
+/**
+ * Create a BlobServiceAsyncClient. Implementation controls construction of 
underlying client.
+ * Customers implementing their own System Producer need to ensure thread 
safety of following impl for generation of client.
+ * If org.apache.samza.system.azureblob.producer.AzureBlobSystemProducer is 
used, it by defaults allows only one thread to create the client.
+ * Please ensure any client implementation of this interface to be thread safe 
as well.
+ * AzureBlobSystemProducer also ensures to safely close the client on call of 
stop(). Please ensure to close clients if using this interface
+ * to create your own client.
+ */
+public interface BlobClientBuilder {
+  /**
+   * Create a client for uploading to Azure Blob Storage
+   * @return New instance of {@link BlobServiceAsyncClient}
+   */
+  BlobServiceAsyncClient getBlobServiceAsyncClient();
+}
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java
new file mode 100644
index 000000000..00d4cf94d
--- /dev/null
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/BlobClientBuilderFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.azureblob;
+
+/**
+ * Constructs a new instance of type {@link BlobClientBuilder}.
+ * Implementation controls construction of underlying instance.
+ */
+public interface BlobClientBuilderFactory {
+  /**
+   * Create a new instance of {@link BlobClientBuilder}
+   * @param systemName Name of the system for which the blob client builder is 
being created
+   * @param azureUrlFormat The format of the Azure URL, which should conform 
to Azure's URL formatting requirements.
+   * @param azureBlobConfig The configuration settings for Azure Blob, 
encapsulated in an {@link AzureBlobConfig} object.
+   *                        This includes metadata details for Azure Blob 
configs.
+   * @return New instance of {@link BlobClientBuilder}
+   */
+  BlobClientBuilder getBlobClientBuilder(String systemName, String 
azureUrlFormat, AzureBlobConfig azureBlobConfig);
+}
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index 2774dba41..9fd2b3b1e 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -44,8 +44,8 @@ import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemProducerException;
-import org.apache.samza.system.azureblob.AzureBlobClientBuilder;
 import org.apache.samza.system.azureblob.AzureBlobConfig;
+import org.apache.samza.system.azureblob.BlobClientBuilderFactory;
 import org.apache.samza.system.azureblob.compression.CompressionFactory;
 import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
 import org.slf4j.Logger;
@@ -116,6 +116,7 @@ public class AzureBlobSystemProducer implements 
SystemProducer {
   // Map of writers indexed first by sourceName and then by (streamName, 
partitionName) or just streamName if partition key does not exist.
   private final Map<String, Map<String, AzureBlobWriter>> writerMap;
   private final AzureBlobWriterFactory writerFactory;
+  private final BlobClientBuilderFactory clientFactory;
 
   private final int blockFlushThresholdSize;
   private final long flushTimeoutMs;
@@ -143,6 +144,13 @@ public class AzureBlobSystemProducer implements 
SystemProducer {
     this.systemName = systemName;
     this.config = config;
 
+    String clientFactoryClassName = 
this.config.getAzureBlobClientBuilderFactoryClassName(this.systemName);
+    try {
+      this.clientFactory = (BlobClientBuilderFactory) 
Class.forName(clientFactoryClassName).newInstance();
+    } catch (Exception e) {
+      throw new SystemProducerException("Could not create blob client factory 
with name " + clientFactoryClassName, e);
+    }
+
     String writerFactoryClassName = 
this.config.getAzureBlobWriterFactoryClassName(this.systemName);
     try {
       this.writerFactory = (AzureBlobWriterFactory) 
Class.forName(writerFactoryClassName).newInstance();
@@ -344,8 +352,8 @@ public class AzureBlobSystemProducer implements 
SystemProducer {
   @VisibleForTesting
   void setupAzureContainer() {
     try {
-      BlobServiceAsyncClient storageClient  = new 
AzureBlobClientBuilder(systemName, AZURE_URL, config)
-          .getBlobServiceAsyncClient();
+      BlobServiceAsyncClient storageClient =
+          clientFactory.getBlobClientBuilder(systemName, AZURE_URL, 
config).getBlobServiceAsyncClient();
       validateFlushThresholdSizeSupported(storageClient);
 
       containerAsyncClient = 
storageClient.getBlobContainerAsyncClient(systemName);

Reply via email to