exceptionfactory commented on code in PR #7955:
URL: https://github.com/apache/nifi/pull/7955#discussion_r1376861876


##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java:
##########
@@ -484,8 +484,10 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
                     + "Will roll back session and discard any partially 
received data.", lease);
             } catch (final KafkaException kex) {
                 getLogger().error("Exception while interacting with Kafka so 
will close the lease {} due to {}", lease, kex, kex);
+                context.yield();
             } catch (final Throwable t) {
                 getLogger().error("Exception while processing data from kafka 
so will close the lease {} due to {}", lease, t, t);

Review Comment:
   ```suggestion
                   getLogger().error("Exception while processing data from 
kafka so will close the lease {}", lease, t);
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java:
##########
@@ -484,8 +484,10 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
                     + "Will roll back session and discard any partially 
received data.", lease);
             } catch (final KafkaException kex) {
                 getLogger().error("Exception while interacting with Kafka so 
will close the lease {} due to {}", lease, kex, kex);

Review Comment:
   ```suggestion
                   getLogger().error("Exception while interacting with Kafka so 
will close the lease {}", lease, kex);
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##########
@@ -585,6 +586,10 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                 failureStrategy.routeFlowFiles(session, flowFiles);
                 context.yield();
             }
+        } catch (final KafkaException e) {
+            getLogger().error("Failed to obtain Kafka publisher; will yield 
Processor", e);

Review Comment:
   Recommend adjusting the message slightly since `Processor` will be included 
in the logger, and the operation also involves transferring files.
   ```suggestion
               getLogger().error("Failed to obtain Kafka Producer", e);
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##########
@@ -585,6 +586,10 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                 failureStrategy.routeFlowFiles(session, flowFiles);
                 context.yield();
             }
+        } catch (final KafkaException e) {
+            getLogger().error("Failed to obtain Kafka publisher; will yield 
Processor", e);

Review Comment:
   Is the `KafkaException` only thrown when failing to obtain a Producer? This 
is a large block of code that seems like it could trigger other issues, but 
perhaps those are already handled?



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java:
##########
@@ -509,6 +510,10 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                 failureStrategy.routeFlowFiles(session, flowFiles);
                 context.yield();
             }
+        } catch (final KafkaException e) {
+            getLogger().error("Failed to obtain Kafka publisher; will yield 
Processor", e);

Review Comment:
   ```suggestion
               getLogger().error("Failed to obtain Kafka Producer", e);
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##########
@@ -541,8 +541,10 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
                     + "Will roll back session and discard any partially 
received data.", lease);
             } catch (final KafkaException kex) {
                 getLogger().error("Exception while interacting with Kafka so 
will close the lease {} due to {}", lease, kex, kex);
+                context.yield();
             } catch (final Throwable t) {
                 getLogger().error("Exception while processing data from kafka 
so will close the lease {} due to {}", lease, t, t);

Review Comment:
   ```suggestion
                   getLogger().error("Exception while processing data from 
kafka so will close the lease {}", lease, t);
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##########
@@ -541,8 +541,10 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
                     + "Will roll back session and discard any partially 
received data.", lease);
             } catch (final KafkaException kex) {
                 getLogger().error("Exception while interacting with Kafka so 
will close the lease {} due to {}", lease, kex, kex);

Review Comment:
   This is a good opportunity to remove the duplicative `due to` portion of the 
log, since it is already included in the stack trace.
   ```suggestion
                   getLogger().error("Exception while interacting with Kafka so 
will close the lease {}", lease, kex);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to