ChengbingLiu commented on code in PR #4368:
URL: https://github.com/apache/flink-cdc/pull/4368#discussion_r3207435683
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java:
##########
@@ -165,14 +166,15 @@ private void handleFailedRequest(
LOG.debug("The BulkRequest has failed", error);
numRecordsOutErrorsCounter.inc(requestEntries.size());
- if (isRetryable(error.getCause())) {
- requestResult.accept(requestEntries);
+ Throwable retryableError = error.getCause() != null ? error.getCause()
: error;
+ if (isRetryable(retryableError)) {
+ resultHandler.retryForEntries(requestEntries);
}
Review Comment:
The ResultHandler API has a `completeExceptionally(Exception)` method to get
it fail-fast. I think we should better handle the case where the error is not
retriable:
```
if (isRetryable(retryableError)) {
resultHandler.retryForEntries(requestEntries);
} else {
resultHandler.completeExceptionally(
retryableError instanceof Exception
? (Exception) retryableError
: new FlinkRuntimeException(retryableError));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]