dannycranmer commented on a change in pull request #18553:
URL: https://github.com/apache/flink/pull/18553#discussion_r795462214
##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
##########
@@ -176,11 +179,35 @@ private void handlePartiallyFailedRequest(
private boolean isRetryable(Throwable err) {
if (err instanceof CompletionException
- && err.getCause() instanceof ResourceNotFoundException) {
+ &&
isInterruptingSignalException(ExceptionUtils.stripCompletionException(err))) {
+ getFatalExceptionCons().accept(new FlinkException("Running job was
cancelled"));
+ return false;
+ }
+ if (err instanceof CompletionException
Review comment:
This code is complex, and hard to read. Suggest we use a helper, would
this work?:
```
if (ExceptionUtils.findThrowable(ex,
ResourceNotFoundException.class).isPresent()) {
....
}
```
##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsException.java
##########
@@ -39,12 +39,12 @@ public KinesisDataStreamsException(final String message,
final Throwable cause)
public KinesisDataStreamsFailFastException() {
super(
- "Encountered an exception while persisting records, not
retrying due to {failOnError} being set.");
Review comment:
Instead of duplicating this string, either call `this(null)` or extract
to constant
##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
##########
@@ -176,11 +179,35 @@ private void handlePartiallyFailedRequest(
private boolean isRetryable(Throwable err) {
if (err instanceof CompletionException
- && err.getCause() instanceof ResourceNotFoundException) {
+ &&
isInterruptingSignalException(ExceptionUtils.stripCompletionException(err))) {
+ getFatalExceptionCons().accept(new FlinkException("Running job was
cancelled"));
+ return false;
+ }
+ if (err instanceof CompletionException
+ && ExceptionUtils.stripCompletionException(err)
+ instanceof ResourceNotFoundException) {
getFatalExceptionCons()
.accept(
new KinesisDataStreamsException(
- "Encountered non-recoverable exception",
err));
+ "Encountered non-recoverable exception
relating to not being able to find the specified resources",
+ err));
+ return false;
+ }
+ if (err instanceof CompletionException
+ && ExceptionUtils.stripCompletionException(err) instanceof
StsException) {
+ getFatalExceptionCons()
+ .accept(
+ new KinesisDataStreamsException(
+ "Encountered non-recoverable exception
relating to the provided credentials.",
+ err));
+ return false;
+ }
+ if (err instanceof Error) {
+ getFatalExceptionCons()
+ .accept(
+ new KinesisDataStreamsException(
+ "Encountered non-recoverable exception
relating to not being able to find the specified resources",
Review comment:
Is this message correct for all sub classes of `Error`? I think we
should make this more generic, something like `Encountered non-recoverable
exception`
##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsException.java
##########
@@ -39,12 +39,12 @@ public KinesisDataStreamsException(final String message,
final Throwable cause)
public KinesisDataStreamsFailFastException() {
super(
- "Encountered an exception while persisting records, not
retrying due to {failOnError} being set.");
+ "Encountered an exception while persisting records, not
retrying due to either: {failOnError} being set or the exception should not be
retried.");
}
public KinesisDataStreamsFailFastException(final Throwable cause) {
super(
- "Encountered an exception while persisting records, not
retrying due to {failOnError} being set.",
+ "Encountered an exception while persisting records, not
retrying due to either: {failOnError} being set or the exception should not be
retried.",
Review comment:
On second thoughts is this even correct? Seems like we are throwing
`KinesisDataStreamsException` in this PR
##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
##########
@@ -176,11 +179,35 @@ private void handlePartiallyFailedRequest(
private boolean isRetryable(Throwable err) {
if (err instanceof CompletionException
- && err.getCause() instanceof ResourceNotFoundException) {
+ &&
isInterruptingSignalException(ExceptionUtils.stripCompletionException(err))) {
+ getFatalExceptionCons().accept(new FlinkException("Running job was
cancelled"));
+ return false;
+ }
+ if (err instanceof CompletionException
+ && ExceptionUtils.stripCompletionException(err)
+ instanceof ResourceNotFoundException) {
getFatalExceptionCons()
.accept(
new KinesisDataStreamsException(
- "Encountered non-recoverable exception",
err));
+ "Encountered non-recoverable exception
relating to not being able to find the specified resources",
+ err));
+ return false;
+ }
+ if (err instanceof CompletionException
+ && ExceptionUtils.stripCompletionException(err) instanceof
StsException) {
+ getFatalExceptionCons()
+ .accept(
+ new KinesisDataStreamsException(
+ "Encountered non-recoverable exception
relating to the provided credentials.",
+ err));
+ return false;
+ }
+ if (err instanceof Error) {
+ getFatalExceptionCons()
Review comment:
Happy to merge to fix the bug, but this is too much logic in the
implementation. Since this is common to all Sinks can we pull to parent?
##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsException.java
##########
@@ -39,12 +39,12 @@ public KinesisDataStreamsException(final String message,
final Throwable cause)
public KinesisDataStreamsFailFastException() {
super(
- "Encountered an exception while persisting records, not
retrying due to {failOnError} being set.");
+ "Encountered an exception while persisting records, not
retrying due to either: {failOnError} being set or the exception should not be
retried.");
}
public KinesisDataStreamsFailFastException(final Throwable cause) {
super(
- "Encountered an exception while persisting records, not
retrying due to {failOnError} being set.",
+ "Encountered an exception while persisting records, not
retrying due to either: {failOnError} being set or the exception should not be
retried.",
Review comment:
Semantically it would make more sense to split the exception rather than
adding ambiguity for user. For example, we could add a
`KinesisDataStreamsFatalException`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]