This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3db4a781676 HOTFIX: fix compilation error
3db4a781676 is described below
commit 3db4a781676199af0be2b67989ca2204c64788fc
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Jul 29 21:07:49 2024 -0700
HOTFIX: fix compilation error
---
.../apache/kafka/streams/processor/internals/ProcessorNode.java | 2 +-
.../kafka/streams/processor/internals/RecordCollectorImpl.java | 8 ++++++--
2 files changed, 7 insertions(+), 3 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 65eec47cb1d..763edc9a045 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -205,7 +205,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
throw e;
} catch (final Exception e) {
final ErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
- null,
+ null, // only required to pass for
DeserializationExceptionHandler
internalProcessorContext.topic(),
internalProcessorContext.partition(),
internalProcessorContext.offset(),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 35097153a5f..de4afc2c924 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -301,12 +301,14 @@ public class RecordCollectorImpl implements
RecordCollector {
try {
final DefaultErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
+ null, // only required to pass for
DeserializationExceptionHandler
context.recordContext().topic(),
context.recordContext().partition(),
context.recordContext().offset(),
context.recordContext().headers(),
processorNodeId,
- taskId);
+ taskId
+ );
response =
productionExceptionHandler.handleSerializationException(errorHandlerContext,
record, exception, origin);
} catch (final Exception e) {
log.error("Fatal when handling serialization exception", e);
@@ -395,12 +397,14 @@ public class RecordCollectorImpl implements
RecordCollector {
sendException.set(new
TaskCorruptedException(Collections.singleton(taskId)));
} else {
final DefaultErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
+ null, // only required to pass for
DeserializationExceptionHandler
context.recordContext().topic(),
context.recordContext().partition(),
context.recordContext().offset(),
context.recordContext().headers(),
processorNodeId,
- taskId);
+ taskId
+ );
if (productionExceptionHandler.handle(errorHandlerContext,
serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
errorMessage += "\nException handler choose to FAIL the
processing, no more records would be sent.";