sjyang18 commented on a change in pull request #5241:
URL: https://github.com/apache/nifi/pull/5241#discussion_r703941601
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -123,17 +131,53 @@
return propertyDescriptors;
}
- protected void bulkInsert(final List<Map<String, Object>> records) throws
CosmosException{
- // In the future, this method will be replaced by calling createItems
API
- // for example, this.container.createItems(records);
- // currently, no createItems API available in Azure Cosmos Java SDK
+ private void insertWithTransactionalBatch(final List<Map<String, Object>>
records, String partitionKeyField) throws CosmosException, ProcessException {
final ComponentLog logger = getLogger();
final CosmosContainer container = getContainer();
- for (Map<String, Object> record : records){
+ if (records.size() > 0) {
+ final Map<String, Object> firstRecord = records.get(0);
+ final String recordPartitionKeyValue =
(String)firstRecord.get(partitionKeyField);
+ final TransactionalBatch tBatch =
TransactionalBatch.createTransactionalBatch(new
PartitionKey(recordPartitionKeyValue));
+ for (Map<String, Object> record : records) {
+ if (conflictHandlingStrategy != null &&
conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())){
+ tBatch.upsertItemOperation(record);
+ } else {
+ tBatch.createItemOperation(record);
+ }
+ }
try {
- container.createItem(record);
- } catch (ConflictException e) {
- // insert with unique id is expected. In case conflict occurs,
use the selected strategy.
+ final TransactionalBatchResponse response =
container.executeTransactionalBatch(tBatch);
+ if (!response.isSuccessStatusCode()) {
+ logger.error("TransactionalBatchResponse status code: " +
response.getStatusCode());
+ if (response.getStatusCode() == 409) {
+ if (conflictHandlingStrategy != null &&
conflictHandlingStrategy.equals(IGNORE_CONFLICT.getValue())) {
+ // ignore conflict
+ return;
+ }
+ }
+ String errMsg = response.getErrorMessage();
+ if (errMsg == null) {
+ errMsg = "TransactionalBatchResponse status code: " +
response.getStatusCode();
+ }
+ throw new ProcessException(errMsg);
Review comment:
I intentionally moved the handling of statusCode in TransactionBatch to
onTrigger for comparison logic with CosmosException case. That way, we see
the consistency between two different path. Let me know if you think this is
still an issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]