exceptionfactory commented on code in PR #7955:
URL: https://github.com/apache/nifi/pull/7955#discussion_r1376861876
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java:
##########
@@ -484,8 +484,10 @@ public void onTrigger(ProcessContext context,
ProcessSession session) throws Pro
+ "Will roll back session and discard any partially
received data.", lease);
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so
will close the lease {} due to {}", lease, kex, kex);
+ context.yield();
} catch (final Throwable t) {
getLogger().error("Exception while processing data from kafka
so will close the lease {} due to {}", lease, t, t);
Review Comment:
```suggestion
getLogger().error("Exception while processing data from
kafka so will close the lease {}", lease, t);
```
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java:
##########
@@ -484,8 +484,10 @@ public void onTrigger(ProcessContext context,
ProcessSession session) throws Pro
+ "Will roll back session and discard any partially
received data.", lease);
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so
will close the lease {} due to {}", lease, kex, kex);
Review Comment:
```suggestion
getLogger().error("Exception while interacting with Kafka so
will close the lease {}", lease, kex);
```
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##########
@@ -585,6 +586,10 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
failureStrategy.routeFlowFiles(session, flowFiles);
context.yield();
}
+ } catch (final KafkaException e) {
+ getLogger().error("Failed to obtain Kafka publisher; will yield
Processor", e);
Review Comment:
Recommend adjusting the message slightly since `Processor` will be included
in the logger, and the operation also involves transferring files.
```suggestion
getLogger().error("Failed to obtain Kafka Producer", e);
```
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##########
@@ -585,6 +586,10 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
failureStrategy.routeFlowFiles(session, flowFiles);
context.yield();
}
+ } catch (final KafkaException e) {
+ getLogger().error("Failed to obtain Kafka publisher; will yield
Processor", e);
Review Comment:
Is the `KafkaException` only thrown when failing to obtain a Producer? This
is a large block of code that seems like it could trigger other issues, but
perhaps those are already handled?
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java:
##########
@@ -509,6 +510,10 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
failureStrategy.routeFlowFiles(session, flowFiles);
context.yield();
}
+ } catch (final KafkaException e) {
+ getLogger().error("Failed to obtain Kafka publisher; will yield
Processor", e);
Review Comment:
```suggestion
getLogger().error("Failed to obtain Kafka Producer", e);
```
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##########
@@ -541,8 +541,10 @@ public void onTrigger(ProcessContext context,
ProcessSession session) throws Pro
+ "Will roll back session and discard any partially
received data.", lease);
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so
will close the lease {} due to {}", lease, kex, kex);
+ context.yield();
} catch (final Throwable t) {
getLogger().error("Exception while processing data from kafka
so will close the lease {} due to {}", lease, t, t);
Review Comment:
```suggestion
getLogger().error("Exception while processing data from
kafka so will close the lease {}", lease, t);
```
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##########
@@ -541,8 +541,10 @@ public void onTrigger(ProcessContext context,
ProcessSession session) throws Pro
+ "Will roll back session and discard any partially
received data.", lease);
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so
will close the lease {} due to {}", lease, kex, kex);
Review Comment:
This is a good opportunity to remove the duplicative `due to` portion of the
log, since it is already included in the stack trace.
```suggestion
getLogger().error("Exception while interacting with Kafka so
will close the lease {}", lease, kex);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]