This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2c957a6e5c1d3ee9c8251e11694040b85662faf9
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Jul 30 17:10:40 2024 -0700

    MINOR: simplify code which calles `Punctuator.punctuate()` (#16725)
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../apache/kafka/streams/processor/internals/ProcessorNode.java    | 7 +------
 .../org/apache/kafka/streams/processor/internals/StreamTask.java   | 4 ++--
 2 files changed, 3 insertions(+), 8 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 175c9e104ef..eaed7c6b8d5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
 import org.apache.kafka.streams.errors.internals.FailedProcessingException;
-import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
 import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
 import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory;
@@ -203,7 +202,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
         } catch (final FailedProcessingException | TaskCorruptedException | 
TaskMigratedException e) {
             // Rethrow exceptions that should not be handled here
             throw e;
-        } catch (final Exception e) {
+        } catch (final RuntimeException e) {
             final ErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
                 null, // only required to pass for 
DeserializationExceptionHandler
                 internalProcessorContext.topic(),
@@ -232,10 +231,6 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
         }
     }
 
-    public void punctuate(final long timestamp, final Punctuator punctuator) {
-        punctuator.punctuate(timestamp);
-    }
-
     public boolean isTerminalNode() {
         return children.isEmpty();
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8cbe5780b90..8b253c6e16a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -806,7 +806,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         } catch (final StreamsException exception) {
             record = null;
             throw exception;
-        } catch (final Exception e) {
+        } catch (final RuntimeException e) {
             handleException(e);
         } finally {
             processorContext.setCurrentNode(null);
@@ -914,7 +914,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         }
 
         try {
-            maybeMeasureLatency(() -> node.punctuate(timestamp, punctuator), 
time, punctuateLatencySensor);
+            maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, 
punctuateLatencySensor);
         } catch (final StreamsException e) {
             throw e;
         } catch (final RuntimeException e) {

Reply via email to