jfrazee commented on a change in pull request #5241:
URL: https://github.com/apache/nifi/pull/5241#discussion_r693395440
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -123,17 +131,53 @@
return propertyDescriptors;
}
- protected void bulkInsert(final List<Map<String, Object>> records) throws
CosmosException{
- // In the future, this method will be replaced by calling createItems
API
- // for example, this.container.createItems(records);
- // currently, no createItems API available in Azure Cosmos Java SDK
+ private void insertWithTransactionalBatch(final List<Map<String, Object>>
records, String partitionKeyField) throws CosmosException, ProcessException {
final ComponentLog logger = getLogger();
final CosmosContainer container = getContainer();
- for (Map<String, Object> record : records){
+ if (records.size() > 0) {
+ final Map<String, Object> firstRecord = records.get(0);
+ final String recordPartitionKeyValue =
(String)firstRecord.get(partitionKeyField);
+ final TransactionalBatch tBatch =
TransactionalBatch.createTransactionalBatch(new
PartitionKey(recordPartitionKeyValue));
+ for (Map<String, Object> record : records) {
+ if (conflictHandlingStrategy != null &&
conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())){
+ tBatch.upsertItemOperation(record);
+ } else {
+ tBatch.createItemOperation(record);
+ }
+ }
try {
- container.createItem(record);
- } catch (ConflictException e) {
- // insert with unique id is expected. In case conflict occurs,
use the selected strategy.
+ final TransactionalBatchResponse response =
container.executeTransactionalBatch(tBatch);
+ if (!response.isSuccessStatusCode()) {
+ logger.error("TransactionalBatchResponse status code: " +
response.getStatusCode());
Review comment:
Seems like we should move this after the 409 check since we're not
treating that as an error. Also, try to use the `{}` string interpolation for
log messages instead of concatenation.
```suggestion
logger.error("TransactionalBatchResponse status code:
{}", response.getStatusCode());
```
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -142,10 +186,58 @@ protected void bulkInsert(final List<Map<String, Object>>
records) throws Cosmos
logger.debug("Ignoring duplicate based on selected
conflict resolution strategy");
}
}
+ } else {
+ throw e;
}
}
}
+ private void chooseInsertMethodAndRun(final List<Map<String, Object>> bin,
String partitionKeyField) throws CosmosException, ProcessException {
+ final ComponentLog logger = getLogger();
+ try {
+ if (bin.size() == 1) {
+ insertRecord(bin.get(0), partitionKeyField);
+ } else {
+ insertWithTransactionalBatch(bin, partitionKeyField);
+ }
+ } catch (CosmosException e) {
+ final String errMsg = String.format("CosmosException status code
is %d while handling bin size = %d", e.getStatusCode(), bin.size());
+ logger.error(errMsg);
Review comment:
```suggestion
logger.error("CosmosException status code is {} while handling
bin size = {}", e.getStatusCode(), bin.size());
```
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -142,10 +186,58 @@ protected void bulkInsert(final List<Map<String, Object>>
records) throws Cosmos
logger.debug("Ignoring duplicate based on selected
conflict resolution strategy");
}
}
Review comment:
```suggestion
} else if (logger.isDebugEnabled()) {
logger.debug("Ignoring duplicate based on selected
conflict resolution strategy");
}
```
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -123,17 +131,53 @@
return propertyDescriptors;
}
- protected void bulkInsert(final List<Map<String, Object>> records) throws
CosmosException{
- // In the future, this method will be replaced by calling createItems
API
- // for example, this.container.createItems(records);
- // currently, no createItems API available in Azure Cosmos Java SDK
+ private void insertWithTransactionalBatch(final List<Map<String, Object>>
records, String partitionKeyField) throws CosmosException, ProcessException {
final ComponentLog logger = getLogger();
Review comment:
Since we're doing this in several places we can probably just move this
to the top level of the class.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -123,17 +131,53 @@
return propertyDescriptors;
}
- protected void bulkInsert(final List<Map<String, Object>> records) throws
CosmosException{
- // In the future, this method will be replaced by calling createItems
API
- // for example, this.container.createItems(records);
- // currently, no createItems API available in Azure Cosmos Java SDK
+ private void insertWithTransactionalBatch(final List<Map<String, Object>>
records, String partitionKeyField) throws CosmosException, ProcessException {
final ComponentLog logger = getLogger();
final CosmosContainer container = getContainer();
- for (Map<String, Object> record : records){
+ if (records.size() > 0) {
+ final Map<String, Object> firstRecord = records.get(0);
+ final String recordPartitionKeyValue =
(String)firstRecord.get(partitionKeyField);
+ final TransactionalBatch tBatch =
TransactionalBatch.createTransactionalBatch(new
PartitionKey(recordPartitionKeyValue));
+ for (Map<String, Object> record : records) {
+ if (conflictHandlingStrategy != null &&
conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())){
+ tBatch.upsertItemOperation(record);
+ } else {
+ tBatch.createItemOperation(record);
+ }
+ }
try {
- container.createItem(record);
- } catch (ConflictException e) {
- // insert with unique id is expected. In case conflict occurs,
use the selected strategy.
+ final TransactionalBatchResponse response =
container.executeTransactionalBatch(tBatch);
+ if (!response.isSuccessStatusCode()) {
+ logger.error("TransactionalBatchResponse status code: " +
response.getStatusCode());
+ if (response.getStatusCode() == 409) {
+ if (conflictHandlingStrategy != null &&
conflictHandlingStrategy.equals(IGNORE_CONFLICT.getValue())) {
+ // ignore conflict
+ return;
+ }
+ }
+ String errMsg = response.getErrorMessage();
+ if (errMsg == null) {
+ errMsg = "TransactionalBatchResponse status code: " +
response.getStatusCode();
+ }
+ throw new ProcessException(errMsg);
Review comment:
I think we should handle a wide variety of response error statuses
before throwing a `ProcessException`. E.g., 413 (Entity too large) is unlikely
to resolve itself by just retrying.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -189,18 +281,33 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
}
batch.add(contentMap);
if (batch.size() == ceiling) {
- bulkInsert(batch);
+ bulkInsert(batch, partitionKeyField);
batch = new ArrayList<>();
}
}
if (!error && batch.size() > 0) {
- bulkInsert(batch);
+ bulkInsert(batch, partitionKeyField);
+ }
+ } catch (CosmosException e) {
Review comment:
```suggestion
} catch (final CosmosException e) {
```
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -123,17 +131,53 @@
return propertyDescriptors;
}
- protected void bulkInsert(final List<Map<String, Object>> records) throws
CosmosException{
- // In the future, this method will be replaced by calling createItems
API
- // for example, this.container.createItems(records);
- // currently, no createItems API available in Azure Cosmos Java SDK
+ private void insertWithTransactionalBatch(final List<Map<String, Object>>
records, String partitionKeyField) throws CosmosException, ProcessException {
Review comment:
```suggestion
private void insertWithTransactionalBatch(final List<Map<String,
Object>> records, final String partitionKeyField) throws CosmosException,
ProcessException {
```
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -123,17 +131,53 @@
return propertyDescriptors;
}
- protected void bulkInsert(final List<Map<String, Object>> records) throws
CosmosException{
- // In the future, this method will be replaced by calling createItems
API
- // for example, this.container.createItems(records);
- // currently, no createItems API available in Azure Cosmos Java SDK
+ private void insertWithTransactionalBatch(final List<Map<String, Object>>
records, String partitionKeyField) throws CosmosException, ProcessException {
final ComponentLog logger = getLogger();
final CosmosContainer container = getContainer();
- for (Map<String, Object> record : records){
+ if (records.size() > 0) {
+ final Map<String, Object> firstRecord = records.get(0);
+ final String recordPartitionKeyValue =
(String)firstRecord.get(partitionKeyField);
+ final TransactionalBatch tBatch =
TransactionalBatch.createTransactionalBatch(new
PartitionKey(recordPartitionKeyValue));
+ for (Map<String, Object> record : records) {
+ if (conflictHandlingStrategy != null &&
conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())){
+ tBatch.upsertItemOperation(record);
+ } else {
+ tBatch.createItemOperation(record);
+ }
+ }
try {
- container.createItem(record);
- } catch (ConflictException e) {
- // insert with unique id is expected. In case conflict occurs,
use the selected strategy.
+ final TransactionalBatchResponse response =
container.executeTransactionalBatch(tBatch);
+ if (!response.isSuccessStatusCode()) {
+ logger.error("TransactionalBatchResponse status code: " +
response.getStatusCode());
+ if (response.getStatusCode() == 409) {
+ if (conflictHandlingStrategy != null &&
conflictHandlingStrategy.equals(IGNORE_CONFLICT.getValue())) {
+ // ignore conflict
+ return;
+ }
+ }
+ String errMsg = response.getErrorMessage();
+ if (errMsg == null) {
+ errMsg = "TransactionalBatchResponse status code: " +
response.getStatusCode();
+ }
+ throw new ProcessException(errMsg);
+ }
+ } catch (CosmosException e) {
+ logger.error("batchResponse-> statusCode: " +
e.getStatusCode() + ", subStatusCode: " + e.getSubStatusCode());
Review comment:
```suggestion
logger.error("batchResponse-> statusCode: {}, subStatusCode:
{}", e.getStatusCode(), e.getSubStatusCode());
```
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -142,10 +186,58 @@ protected void bulkInsert(final List<Map<String, Object>>
records) throws Cosmos
logger.debug("Ignoring duplicate based on selected
conflict resolution strategy");
}
}
+ } else {
+ throw e;
}
}
}
+ private void chooseInsertMethodAndRun(final List<Map<String, Object>> bin,
String partitionKeyField) throws CosmosException, ProcessException {
+ final ComponentLog logger = getLogger();
+ try {
+ if (bin.size() == 1) {
+ insertRecord(bin.get(0), partitionKeyField);
+ } else {
+ insertWithTransactionalBatch(bin, partitionKeyField);
+ }
+ } catch (CosmosException e) {
+ final String errMsg = String.format("CosmosException status code
is %d while handling bin size = %d", e.getStatusCode(), bin.size());
+ logger.error(errMsg);
+ if(e.getStatusCode() == 429) { // request being throttled
Review comment:
```suggestion
if (e.getStatusCode() == 429) { // request being throttled
```
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -189,18 +281,33 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
}
batch.add(contentMap);
if (batch.size() == ceiling) {
- bulkInsert(batch);
+ bulkInsert(batch, partitionKeyField);
batch = new ArrayList<>();
}
}
if (!error && batch.size() > 0) {
- bulkInsert(batch);
+ bulkInsert(batch, partitionKeyField);
+ }
+ } catch (CosmosException e) {
+ final int statusCode = e.getStatusCode();
+ logger.error("statusCode: " + statusCode + ", subStatusCode: " +
e.getSubStatusCode());
+
+ if (statusCode == 429) {
+ logger.error("Failure due to server-side throttling. Increase
RU setting for your workload");
+ yield = true;
+ } else if (statusCode == 410) {
+ logger.error("A request to change the throughput is currently
in progress");
+ yield = true;
+ } else {
+ error = true;
}
- } catch (SchemaNotFoundException | MalformedRecordException |
IOException | CosmosException e) {
+ } catch (IllegalTypeConversionException | ProcessException |
SchemaNotFoundException | MalformedRecordException | IOException e) {
logger.error("PutAzureCosmoDBRecord failed with error: {}", new
Object[]{e.getMessage()}, e);
Review comment:
I think some of the `logger.error()` above might be duplicated here
since the error messages are coming from the exception. Do we need both?
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AzureCosmosDBUtils.java
##########
@@ -36,7 +36,6 @@
+ " from Azure Portal (Overview->URI)")
.required(false)
.addValidator(StandardValidators.URI_VALIDATOR)
- .sensitive(true)
Review comment:
We probably can't make this change at this point unless you think it's
critical that it be changed. Any existing flows will break because of this
change.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -142,10 +186,58 @@ protected void bulkInsert(final List<Map<String, Object>>
records) throws Cosmos
logger.debug("Ignoring duplicate based on selected
conflict resolution strategy");
}
}
+ } else {
+ throw e;
}
}
}
+ private void chooseInsertMethodAndRun(final List<Map<String, Object>> bin,
String partitionKeyField) throws CosmosException, ProcessException {
+ final ComponentLog logger = getLogger();
+ try {
+ if (bin.size() == 1) {
+ insertRecord(bin.get(0), partitionKeyField);
+ } else {
+ insertWithTransactionalBatch(bin, partitionKeyField);
+ }
+ } catch (CosmosException e) {
Review comment:
```suggestion
} catch (final CosmosException e) {
```
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -123,17 +131,53 @@
return propertyDescriptors;
}
- protected void bulkInsert(final List<Map<String, Object>> records) throws
CosmosException{
- // In the future, this method will be replaced by calling createItems
API
- // for example, this.container.createItems(records);
- // currently, no createItems API available in Azure Cosmos Java SDK
+ private void insertWithTransactionalBatch(final List<Map<String, Object>>
records, String partitionKeyField) throws CosmosException, ProcessException {
final ComponentLog logger = getLogger();
final CosmosContainer container = getContainer();
- for (Map<String, Object> record : records){
+ if (records.size() > 0) {
+ final Map<String, Object> firstRecord = records.get(0);
+ final String recordPartitionKeyValue =
(String)firstRecord.get(partitionKeyField);
+ final TransactionalBatch tBatch =
TransactionalBatch.createTransactionalBatch(new
PartitionKey(recordPartitionKeyValue));
+ for (Map<String, Object> record : records) {
+ if (conflictHandlingStrategy != null &&
conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())){
+ tBatch.upsertItemOperation(record);
+ } else {
+ tBatch.createItemOperation(record);
+ }
+ }
try {
- container.createItem(record);
- } catch (ConflictException e) {
- // insert with unique id is expected. In case conflict occurs,
use the selected strategy.
+ final TransactionalBatchResponse response =
container.executeTransactionalBatch(tBatch);
+ if (!response.isSuccessStatusCode()) {
+ logger.error("TransactionalBatchResponse status code: " +
response.getStatusCode());
+ if (response.getStatusCode() == 409) {
+ if (conflictHandlingStrategy != null &&
conflictHandlingStrategy.equals(IGNORE_CONFLICT.getValue())) {
+ // ignore conflict
+ return;
+ }
+ }
+ String errMsg = response.getErrorMessage();
+ if (errMsg == null) {
+ errMsg = "TransactionalBatchResponse status code: " +
response.getStatusCode();
+ }
+ throw new ProcessException(errMsg);
+ }
+ } catch (CosmosException e) {
Review comment:
```suggestion
} catch (final CosmosException e) {
```
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java
##########
@@ -189,18 +281,33 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
}
batch.add(contentMap);
if (batch.size() == ceiling) {
- bulkInsert(batch);
+ bulkInsert(batch, partitionKeyField);
batch = new ArrayList<>();
}
}
if (!error && batch.size() > 0) {
- bulkInsert(batch);
+ bulkInsert(batch, partitionKeyField);
+ }
+ } catch (CosmosException e) {
+ final int statusCode = e.getStatusCode();
+ logger.error("statusCode: " + statusCode + ", subStatusCode: " +
e.getSubStatusCode());
Review comment:
```suggestion
logger.error("statusCode: {}, subStatusCode: {} ", statusCode,
e.getSubStatusCode());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]