This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 4824bfe SAMZA-2668: AzureBlobSystemProducer: using TokenCredential
throws ResourceNotFound 404 when establishing connection (#1516)
4824bfe is described below
commit 4824bfe32711a9f9e755c3b3aa8bd8877e9023d6
Author: lakshmi-manasa-g <[email protected]>
AuthorDate: Thu Aug 12 12:39:39 2021 -0700
SAMZA-2668: AzureBlobSystemProducer: using TokenCredential throws
ResourceNotFound 404 when establishing connection (#1516)
---
.../producer/AzureBlobSystemProducer.java | 30 ++++++++++++++--------
1 file changed, 19 insertions(+), 11 deletions(-)
diff --git
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index 781dce4..91927d2 100644
---
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -359,18 +359,26 @@ public class AzureBlobSystemProducer implements
SystemProducer {
}
void validateFlushThresholdSizeSupported(BlobServiceAsyncClient
storageClient) {
- SkuName accountType = storageClient.getAccountInfo().block().getSkuName();
- String accountName = storageClient.getAccountName();
long flushThresholdSize = config.getMaxFlushThresholdSize(systemName);
- boolean isPremiumAccount = SkuName.PREMIUM_LRS == accountType;
- if (isPremiumAccount && flushThresholdSize > PREMIUM_MAX_BLOCK_SIZE) { //
100 MB
- throw new SystemProducerException("Azure storage account with name: " +
accountName
- + " is a premium account and can only handle upto " +
PREMIUM_MAX_BLOCK_SIZE + " threshold size. Given flush threshold size is "
- + flushThresholdSize);
- } else if (!isPremiumAccount && flushThresholdSize >
STANDARD_MAX_BLOCK_SIZE) { // STANDARD account
- throw new SystemProducerException(
- "Azure storage account with name: " + accountName + " is a standard
account and can only handle upto " + STANDARD_MAX_BLOCK_SIZE + " threshold
size. Given flush threshold size is "
- + flushThresholdSize);
+ try {
+ SkuName accountType =
storageClient.getAccountInfo().block().getSkuName();
+ String accountName = storageClient.getAccountName();
+ boolean isPremiumAccount = SkuName.PREMIUM_LRS == accountType;
+ if (isPremiumAccount && flushThresholdSize > PREMIUM_MAX_BLOCK_SIZE) {
// 100 MB
+ throw new SystemProducerException("Azure storage account with name: "
+ accountName + " is a premium account and can only handle upto "
+ + PREMIUM_MAX_BLOCK_SIZE + " threshold size. Given flush threshold
size is " + flushThresholdSize);
+ } else if (!isPremiumAccount && flushThresholdSize >
STANDARD_MAX_BLOCK_SIZE) { // STANDARD account
+ throw new SystemProducerException(
+ "Azure storage account with name: " + accountName + " is a
standard account and can only handle upto " + STANDARD_MAX_BLOCK_SIZE + "
threshold size. Given flush threshold size is "
+ + flushThresholdSize);
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception encountered while trying to ensure that the given
flush threshold size is "
+ + "supported by the desired azure blob storage account. "
+ + "{} is the given account name and {} is the flush threshold
size.",
+ config.getAzureAccountName(systemName), flushThresholdSize);
+ LOG.warn("SystemProducer will continue and send messages to Azure Blob
Storage but they might fail "
+ + "if the given threshold size is not supported by the account");
}
}