jfrazee commented on a change in pull request #5241:
URL: https://github.com/apache/nifi/pull/5241#discussion_r693395440



##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -123,17 +131,53 @@
         return propertyDescriptors;
     }
 
-    protected void bulkInsert(final List<Map<String, Object>> records) throws 
CosmosException{
-        // In the future, this method will be replaced by calling createItems 
API
-        // for example, this.container.createItems(records);
-        // currently, no createItems API available in Azure Cosmos Java SDK
+    private void insertWithTransactionalBatch(final List<Map<String, Object>> 
records, String partitionKeyField) throws CosmosException, ProcessException {
         final ComponentLog logger = getLogger();
         final CosmosContainer container = getContainer();
-        for (Map<String, Object> record : records){
+        if (records.size() > 0) {
+            final Map<String, Object> firstRecord = records.get(0);
+            final String recordPartitionKeyValue = 
(String)firstRecord.get(partitionKeyField);
+            final TransactionalBatch tBatch = 
TransactionalBatch.createTransactionalBatch(new 
PartitionKey(recordPartitionKeyValue));
+            for (Map<String, Object> record : records) {
+                if (conflictHandlingStrategy != null && 
conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())){
+                    tBatch.upsertItemOperation(record);
+                } else {
+                    tBatch.createItemOperation(record);
+                }
+            }
             try {
-                container.createItem(record);
-            } catch (ConflictException e) {
-                // insert with unique id is expected. In case conflict occurs, 
use the selected strategy.
+                final TransactionalBatchResponse response = 
container.executeTransactionalBatch(tBatch);
+                if (!response.isSuccessStatusCode()) {
+                    logger.error("TransactionalBatchResponse status code: " +  
response.getStatusCode());

Review comment:
       Seems like we should move this after the 409 check since we're not 
treating that as an error. Also, try to use the `{}` string interpolation for 
log messages instead of concatenation.
   ```suggestion
                       logger.error("TransactionalBatchResponse status code: 
{}", response.getStatusCode());
   ```

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -142,10 +186,58 @@ protected void bulkInsert(final List<Map<String, Object>> 
records) throws Cosmos
                         logger.debug("Ignoring duplicate based on selected 
conflict resolution strategy");
                     }
                 }
+            } else {
+                throw e;
             }
         }
     }
 
+    private void chooseInsertMethodAndRun(final List<Map<String, Object>> bin, 
String partitionKeyField) throws CosmosException, ProcessException {
+        final ComponentLog logger = getLogger();
+        try {
+            if (bin.size() == 1) {
+                insertRecord(bin.get(0), partitionKeyField);
+            } else {
+                insertWithTransactionalBatch(bin, partitionKeyField);
+            }
+        } catch (CosmosException e) {
+            final String errMsg = String.format("CosmosException status code 
is %d while handling bin size = %d", e.getStatusCode(), bin.size());
+            logger.error(errMsg);

Review comment:
       ```suggestion
               logger.error("CosmosException status code is {} while handling 
bin size = {}", e.getStatusCode(), bin.size());
   ```

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -142,10 +186,58 @@ protected void bulkInsert(final List<Map<String, Object>> 
records) throws Cosmos
                         logger.debug("Ignoring duplicate based on selected 
conflict resolution strategy");
                     }
                 }

Review comment:
       ```suggestion
                   } else if (logger.isDebugEnabled()) {
                        logger.debug("Ignoring duplicate based on selected 
conflict resolution strategy");
                   }
   ```

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -123,17 +131,53 @@
         return propertyDescriptors;
     }
 
-    protected void bulkInsert(final List<Map<String, Object>> records) throws 
CosmosException{
-        // In the future, this method will be replaced by calling createItems 
API
-        // for example, this.container.createItems(records);
-        // currently, no createItems API available in Azure Cosmos Java SDK
+    private void insertWithTransactionalBatch(final List<Map<String, Object>> 
records, String partitionKeyField) throws CosmosException, ProcessException {
         final ComponentLog logger = getLogger();

Review comment:
       Since we're doing this in several places we can probably just move this 
to the top level of the class.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -123,17 +131,53 @@
         return propertyDescriptors;
     }
 
-    protected void bulkInsert(final List<Map<String, Object>> records) throws 
CosmosException{
-        // In the future, this method will be replaced by calling createItems 
API
-        // for example, this.container.createItems(records);
-        // currently, no createItems API available in Azure Cosmos Java SDK
+    private void insertWithTransactionalBatch(final List<Map<String, Object>> 
records, String partitionKeyField) throws CosmosException, ProcessException {
         final ComponentLog logger = getLogger();
         final CosmosContainer container = getContainer();
-        for (Map<String, Object> record : records){
+        if (records.size() > 0) {
+            final Map<String, Object> firstRecord = records.get(0);
+            final String recordPartitionKeyValue = 
(String)firstRecord.get(partitionKeyField);
+            final TransactionalBatch tBatch = 
TransactionalBatch.createTransactionalBatch(new 
PartitionKey(recordPartitionKeyValue));
+            for (Map<String, Object> record : records) {
+                if (conflictHandlingStrategy != null && 
conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())){
+                    tBatch.upsertItemOperation(record);
+                } else {
+                    tBatch.createItemOperation(record);
+                }
+            }
             try {
-                container.createItem(record);
-            } catch (ConflictException e) {
-                // insert with unique id is expected. In case conflict occurs, 
use the selected strategy.
+                final TransactionalBatchResponse response = 
container.executeTransactionalBatch(tBatch);
+                if (!response.isSuccessStatusCode()) {
+                    logger.error("TransactionalBatchResponse status code: " +  
response.getStatusCode());
+                    if (response.getStatusCode() == 409) {
+                        if (conflictHandlingStrategy != null && 
conflictHandlingStrategy.equals(IGNORE_CONFLICT.getValue())) {
+                            // ignore conflict
+                            return;
+                        }
+                    }
+                    String errMsg = response.getErrorMessage();
+                    if (errMsg == null) {
+                        errMsg = "TransactionalBatchResponse status code: " +  
response.getStatusCode();
+                    }
+                    throw new ProcessException(errMsg);

Review comment:
       I think we should handle a wide variety of response error statuses 
before throwing a `ProcessException`. E.g., 413 (Entity too large) is unlikely 
to resolve itself by just retrying.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -189,18 +281,33 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                 }
                 batch.add(contentMap);
                 if (batch.size() == ceiling) {
-                    bulkInsert(batch);
+                    bulkInsert(batch, partitionKeyField);
                     batch = new ArrayList<>();
                 }
             }
             if (!error && batch.size() > 0) {
-                bulkInsert(batch);
+                bulkInsert(batch, partitionKeyField);
+            }
+        } catch (CosmosException e)  {

Review comment:
       ```suggestion
           } catch (final CosmosException e)  {
   ```

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -123,17 +131,53 @@
         return propertyDescriptors;
     }
 
-    protected void bulkInsert(final List<Map<String, Object>> records) throws 
CosmosException{
-        // In the future, this method will be replaced by calling createItems 
API
-        // for example, this.container.createItems(records);
-        // currently, no createItems API available in Azure Cosmos Java SDK
+    private void insertWithTransactionalBatch(final List<Map<String, Object>> 
records, String partitionKeyField) throws CosmosException, ProcessException {

Review comment:
       ```suggestion
       private void insertWithTransactionalBatch(final List<Map<String, 
Object>> records, final String partitionKeyField) throws CosmosException, 
ProcessException {
   ```

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -123,17 +131,53 @@
         return propertyDescriptors;
     }
 
-    protected void bulkInsert(final List<Map<String, Object>> records) throws 
CosmosException{
-        // In the future, this method will be replaced by calling createItems 
API
-        // for example, this.container.createItems(records);
-        // currently, no createItems API available in Azure Cosmos Java SDK
+    private void insertWithTransactionalBatch(final List<Map<String, Object>> 
records, String partitionKeyField) throws CosmosException, ProcessException {
         final ComponentLog logger = getLogger();
         final CosmosContainer container = getContainer();
-        for (Map<String, Object> record : records){
+        if (records.size() > 0) {
+            final Map<String, Object> firstRecord = records.get(0);
+            final String recordPartitionKeyValue = 
(String)firstRecord.get(partitionKeyField);
+            final TransactionalBatch tBatch = 
TransactionalBatch.createTransactionalBatch(new 
PartitionKey(recordPartitionKeyValue));
+            for (Map<String, Object> record : records) {
+                if (conflictHandlingStrategy != null && 
conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())){
+                    tBatch.upsertItemOperation(record);
+                } else {
+                    tBatch.createItemOperation(record);
+                }
+            }
             try {
-                container.createItem(record);
-            } catch (ConflictException e) {
-                // insert with unique id is expected. In case conflict occurs, 
use the selected strategy.
+                final TransactionalBatchResponse response = 
container.executeTransactionalBatch(tBatch);
+                if (!response.isSuccessStatusCode()) {
+                    logger.error("TransactionalBatchResponse status code: " +  
response.getStatusCode());
+                    if (response.getStatusCode() == 409) {
+                        if (conflictHandlingStrategy != null && 
conflictHandlingStrategy.equals(IGNORE_CONFLICT.getValue())) {
+                            // ignore conflict
+                            return;
+                        }
+                    }
+                    String errMsg = response.getErrorMessage();
+                    if (errMsg == null) {
+                        errMsg = "TransactionalBatchResponse status code: " +  
response.getStatusCode();
+                    }
+                    throw new ProcessException(errMsg);
+                }
+            } catch (CosmosException e) {
+                logger.error("batchResponse-> statusCode: " + 
e.getStatusCode() + ", subStatusCode: "  + e.getSubStatusCode());

Review comment:
       ```suggestion
                   logger.error("batchResponse-> statusCode: {}, subStatusCode: 
{}", e.getStatusCode(), e.getSubStatusCode());
   ```

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -142,10 +186,58 @@ protected void bulkInsert(final List<Map<String, Object>> 
records) throws Cosmos
                         logger.debug("Ignoring duplicate based on selected 
conflict resolution strategy");
                     }
                 }
+            } else {
+                throw e;
             }
         }
     }
 
+    private void chooseInsertMethodAndRun(final List<Map<String, Object>> bin, 
String partitionKeyField) throws CosmosException, ProcessException {
+        final ComponentLog logger = getLogger();
+        try {
+            if (bin.size() == 1) {
+                insertRecord(bin.get(0), partitionKeyField);
+            } else {
+                insertWithTransactionalBatch(bin, partitionKeyField);
+            }
+        } catch (CosmosException e) {
+            final String errMsg = String.format("CosmosException status code 
is %d while handling bin size = %d", e.getStatusCode(), bin.size());
+            logger.error(errMsg);
+            if(e.getStatusCode() == 429) { // request being throttled

Review comment:
       ```suggestion
               if (e.getStatusCode() == 429) { // request being throttled
   ```

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -189,18 +281,33 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                 }
                 batch.add(contentMap);
                 if (batch.size() == ceiling) {
-                    bulkInsert(batch);
+                    bulkInsert(batch, partitionKeyField);
                     batch = new ArrayList<>();
                 }
             }
             if (!error && batch.size() > 0) {
-                bulkInsert(batch);
+                bulkInsert(batch, partitionKeyField);
+            }
+        } catch (CosmosException e)  {
+            final int statusCode =  e.getStatusCode();
+            logger.error("statusCode: " + statusCode + ", subStatusCode: "  + 
e.getSubStatusCode());
+
+            if (statusCode == 429) {
+                logger.error("Failure due to server-side throttling. Increase 
RU setting for your workload");
+                yield = true;
+            } else if (statusCode == 410) {
+                logger.error("A request to change the throughput is currently 
in progress");
+                yield = true;
+            } else {
+                error = true;
             }
-        } catch (SchemaNotFoundException | MalformedRecordException | 
IOException | CosmosException e) {
+        } catch (IllegalTypeConversionException | ProcessException | 
SchemaNotFoundException | MalformedRecordException | IOException e) {
             logger.error("PutAzureCosmoDBRecord failed with error: {}", new 
Object[]{e.getMessage()}, e);

Review comment:
       I think some of the `logger.error()` above might be duplicated here 
since the error messages are coming from the exception. Do we need both?

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AzureCosmosDBUtils.java
##########
@@ -36,7 +36,6 @@
             + " from Azure Portal (Overview->URI)")
         .required(false)
         .addValidator(StandardValidators.URI_VALIDATOR)
-        .sensitive(true)

Review comment:
       We probably can't make this change at this point unless you think it's 
critical that it be changed. Any existing flows will break because of this 
change.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -142,10 +186,58 @@ protected void bulkInsert(final List<Map<String, Object>> 
records) throws Cosmos
                         logger.debug("Ignoring duplicate based on selected 
conflict resolution strategy");
                     }
                 }
+            } else {
+                throw e;
             }
         }
     }
 
+    private void chooseInsertMethodAndRun(final List<Map<String, Object>> bin, 
String partitionKeyField) throws CosmosException, ProcessException {
+        final ComponentLog logger = getLogger();
+        try {
+            if (bin.size() == 1) {
+                insertRecord(bin.get(0), partitionKeyField);
+            } else {
+                insertWithTransactionalBatch(bin, partitionKeyField);
+            }
+        } catch (CosmosException e) {

Review comment:
       ```suggestion
           } catch (final CosmosException e) {
   ```

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -123,17 +131,53 @@
         return propertyDescriptors;
     }
 
-    protected void bulkInsert(final List<Map<String, Object>> records) throws 
CosmosException{
-        // In the future, this method will be replaced by calling createItems 
API
-        // for example, this.container.createItems(records);
-        // currently, no createItems API available in Azure Cosmos Java SDK
+    private void insertWithTransactionalBatch(final List<Map<String, Object>> 
records, String partitionKeyField) throws CosmosException, ProcessException {
         final ComponentLog logger = getLogger();
         final CosmosContainer container = getContainer();
-        for (Map<String, Object> record : records){
+        if (records.size() > 0) {
+            final Map<String, Object> firstRecord = records.get(0);
+            final String recordPartitionKeyValue = 
(String)firstRecord.get(partitionKeyField);
+            final TransactionalBatch tBatch = 
TransactionalBatch.createTransactionalBatch(new 
PartitionKey(recordPartitionKeyValue));
+            for (Map<String, Object> record : records) {
+                if (conflictHandlingStrategy != null && 
conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())){
+                    tBatch.upsertItemOperation(record);
+                } else {
+                    tBatch.createItemOperation(record);
+                }
+            }
             try {
-                container.createItem(record);
-            } catch (ConflictException e) {
-                // insert with unique id is expected. In case conflict occurs, 
use the selected strategy.
+                final TransactionalBatchResponse response = 
container.executeTransactionalBatch(tBatch);
+                if (!response.isSuccessStatusCode()) {
+                    logger.error("TransactionalBatchResponse status code: " +  
response.getStatusCode());
+                    if (response.getStatusCode() == 409) {
+                        if (conflictHandlingStrategy != null && 
conflictHandlingStrategy.equals(IGNORE_CONFLICT.getValue())) {
+                            // ignore conflict
+                            return;
+                        }
+                    }
+                    String errMsg = response.getErrorMessage();
+                    if (errMsg == null) {
+                        errMsg = "TransactionalBatchResponse status code: " +  
response.getStatusCode();
+                    }
+                    throw new ProcessException(errMsg);
+                }
+            } catch (CosmosException e) {

Review comment:
       ```suggestion
               } catch (final CosmosException e) {
   ```

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -189,18 +281,33 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                 }
                 batch.add(contentMap);
                 if (batch.size() == ceiling) {
-                    bulkInsert(batch);
+                    bulkInsert(batch, partitionKeyField);
                     batch = new ArrayList<>();
                 }
             }
             if (!error && batch.size() > 0) {
-                bulkInsert(batch);
+                bulkInsert(batch, partitionKeyField);
+            }
+        } catch (CosmosException e)  {
+            final int statusCode =  e.getStatusCode();
+            logger.error("statusCode: " + statusCode + ", subStatusCode: "  + 
e.getSubStatusCode());

Review comment:
       ```suggestion
               logger.error("statusCode: {}, subStatusCode: {} ", statusCode, 
e.getSubStatusCode());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to