This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 2c957a6e5c1d3ee9c8251e11694040b85662faf9 Author: Matthias J. Sax <[email protected]> AuthorDate: Tue Jul 30 17:10:40 2024 -0700 MINOR: simplify code which calles `Punctuator.punctuate()` (#16725) Reviewers: Bill Bejeck <[email protected]> --- .../apache/kafka/streams/processor/internals/ProcessorNode.java | 7 +------ .../org/apache/kafka/streams/processor/internals/StreamTask.java | 4 ++-- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 175c9e104ef..eaed7c6b8d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -24,7 +24,6 @@ import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.errors.internals.FailedProcessingException; -import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory; @@ -203,7 +202,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> { } catch (final FailedProcessingException | TaskCorruptedException | TaskMigratedException e) { // Rethrow exceptions that should not be handled here throw e; - } catch (final Exception e) { + } catch (final RuntimeException e) { final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( null, // only required to pass for DeserializationExceptionHandler internalProcessorContext.topic(), @@ -232,10 +231,6 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> { } } - public void punctuate(final long timestamp, final Punctuator punctuator) { - punctuator.punctuate(timestamp); - } - public boolean isTerminalNode() { return children.isEmpty(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 8cbe5780b90..8b253c6e16a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -806,7 +806,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } catch (final StreamsException exception) { record = null; throw exception; - } catch (final Exception e) { + } catch (final RuntimeException e) { handleException(e); } finally { processorContext.setCurrentNode(null); @@ -914,7 +914,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } try { - maybeMeasureLatency(() -> node.punctuate(timestamp, punctuator), time, punctuateLatencySensor); + maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, punctuateLatencySensor); } catch (final StreamsException e) { throw e; } catch (final RuntimeException e) {
