This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 2127ae63290 KAFKA-17994 Checked exceptions are not handled (#17817)
2127ae63290 is described below
commit 2127ae63290e1309e0d1ebbc84f6ef3a6f81bae2
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Nov 15 04:36:03 2024 -0800
KAFKA-17994 Checked exceptions are not handled (#17817)
Reviewers: Bill Bejeck <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../internals/GlobalStateManagerImpl.java | 5 ++++-
.../streams/processor/internals/ProcessorNode.java | 10 +++++++--
.../processor/internals/RecordCollectorImpl.java | 25 ++++++++++++++++------
.../processor/internals/RecordDeserializer.java | 12 ++++++++---
.../streams/processor/internals/StreamTask.java | 24 +++++++--------------
5 files changed, 48 insertions(+), 28 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 12d4c6c603d..24cf51a67be 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -319,7 +319,10 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
record.headers()));
restoreCount++;
}
- } catch (final RuntimeException deserializationException) {
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need to
catch `Exception`
+ // (instead of `RuntimeException`) to work well with
those languages
handleDeserializationFailure(
deserializationExceptionHandler,
globalProcessorContext,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 2b945f2da29..c1f9c423049 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -203,7 +203,10 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
} catch (final FailedProcessingException | TaskCorruptedException |
TaskMigratedException e) {
// Rethrow exceptions that should not be handled here
throw e;
- } catch (final RuntimeException processingException) {
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
final ErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
null, // only required to pass for
DeserializationExceptionHandler
internalProcessorContext.topic(),
@@ -220,7 +223,10 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
processingExceptionHandler.handle(errorHandlerContext,
record, processingException),
"Invalid ProductionExceptionHandler response."
);
- } catch (final RuntimeException fatalUserException) {
+ } catch (final Exception fatalUserException) {
+ // while Java distinguishes checked vs unchecked exceptions,
other languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
log.error(
"Processing error callback failed after processing error
for record: {}",
errorHandlerContext,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index b725feb4efa..49878b2e883 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -211,7 +211,10 @@ public class RecordCollectorImpl implements
RecordCollector {
key,
keySerializer,
exception);
- } catch (final RuntimeException serializationException) {
+ } catch (final Exception serializationException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
handleException(
ProductionExceptionHandler.SerializationExceptionOrigin.KEY,
topic,
@@ -222,7 +225,8 @@ public class RecordCollectorImpl implements RecordCollector
{
timestamp,
processorNodeId,
context,
- serializationException);
+ serializationException
+ );
return;
}
@@ -235,7 +239,10 @@ public class RecordCollectorImpl implements
RecordCollector {
value,
valueSerializer,
exception);
- } catch (final RuntimeException serializationException) {
+ } catch (final Exception serializationException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
handleException(
ProductionExceptionHandler.SerializationExceptionOrigin.VALUE,
topic,
@@ -313,7 +320,7 @@ public class RecordCollectorImpl implements RecordCollector
{
final Long timestamp,
final String processorNodeId,
final InternalProcessorContext<Void,
Void> context,
- final RuntimeException
serializationException) {
+ final Exception
serializationException) {
log.debug(String.format("Error serializing record for topic %s",
topic), serializationException);
final ProducerRecord<K, V> record = new ProducerRecord<>(topic,
partition, timestamp, key, value, headers);
@@ -329,7 +336,10 @@ public class RecordCollectorImpl implements
RecordCollector {
),
"Invalid ProductionExceptionHandler response."
);
- } catch (final RuntimeException fatalUserException) {
+ } catch (final Exception fatalUserException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
log.error(
String.format(
"Production error callback failed after serialization
error for record %s: %s",
@@ -446,7 +456,10 @@ public class RecordCollectorImpl implements
RecordCollector {
),
"Invalid ProductionExceptionHandler response."
);
- } catch (final RuntimeException fatalUserException) {
+ } catch (final Exception fatalUserException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
log.error(
"Production error callback failed after production
error for record {}",
serializedRecord,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index aefa15da660..a4179509c6a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -71,7 +71,10 @@ public class RecordDeserializer {
rawRecord.headers(),
Optional.empty()
);
- } catch (final RuntimeException deserializationException) {
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
handleDeserializationFailure(deserializationExceptionHandler,
processorContext, deserializationException, rawRecord, log,
droppedRecordsSensor, sourceNode().name());
return null; // 'handleDeserializationFailure' would either throw
or swallow -- if we swallow we need to skip the record by returning 'null'
}
@@ -79,7 +82,7 @@ public class RecordDeserializer {
public static void handleDeserializationFailure(final
DeserializationExceptionHandler deserializationExceptionHandler,
final ProcessorContext<?,
?> processorContext,
- final RuntimeException
deserializationException,
+ final Exception
deserializationException,
final
ConsumerRecord<byte[], byte[]> rawRecord,
final Logger log,
final Sensor
droppedRecordsSensor,
@@ -101,7 +104,10 @@ public class RecordDeserializer {
deserializationExceptionHandler.handle(errorHandlerContext,
rawRecord, deserializationException),
"Invalid DeserializationExceptionHandler response."
);
- } catch (final RuntimeException fatalUserException) {
+ } catch (final Exception fatalUserException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
log.error(
"Deserialization error callback failed after deserialization
error for record {}",
rawRecord,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 76df4693bf6..59ed6b92cd9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -52,8 +52,6 @@ import
org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -884,18 +882,6 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
processTimeMs = 0L;
}
- private String getStacktraceString(final Throwable e) {
- String stacktrace = null;
- try (final StringWriter stringWriter = new StringWriter();
- final PrintWriter printWriter = new PrintWriter(stringWriter)) {
- e.printStackTrace(printWriter);
- stacktrace = stringWriter.toString();
- } catch (final IOException ioe) {
- log.error("Encountered error extracting stacktrace from this
exception", ioe);
- }
- return stacktrace;
- }
-
/**
* @throws IllegalStateException if the current node is not null
* @throws TaskMigratedException if the task producer got fenced (EOS only)
@@ -938,7 +924,10 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
throw createStreamsException(node.name(), e.getCause());
} catch (final TaskCorruptedException | TaskMigratedException e) {
throw e;
- } catch (final RuntimeException processingException) {
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
final ErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
null,
recordContext.topic(),
@@ -956,7 +945,10 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
processingExceptionHandler.handle(errorHandlerContext,
null, processingException),
"Invalid ProcessingExceptionHandler response."
);
- } catch (final RuntimeException fatalUserException) {
+ } catch (final Exception fatalUserException) {
+ // while Java distinguishes checked vs unchecked exceptions,
other languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
log.error(
"Processing error callback failed after processing error
for record: {}",
errorHandlerContext,