This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f02c28b21dc KAFKA-17994 Checked exceptions are not handled (#17817)
f02c28b21dc is described below
commit f02c28b21dc4d89e1d7f2e16f8f9e067a9575e61
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Nov 15 04:36:03 2024 -0800
KAFKA-17994 Checked exceptions are not handled (#17817)
Reviewers: Bill Bejeck <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../internals/GlobalStateManagerImpl.java | 5 ++++-
.../streams/processor/internals/ProcessorNode.java | 10 +++++++--
.../processor/internals/RecordCollectorImpl.java | 25 ++++++++++++++++------
.../processor/internals/RecordDeserializer.java | 12 ++++++++---
.../streams/processor/internals/StreamTask.java | 24 +++++++--------------
5 files changed, 48 insertions(+), 28 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index ad53634386e..96d9074eff3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -319,7 +319,10 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
record.headers()));
restoreCount++;
}
- } catch (final RuntimeException deserializationException) {
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need to
catch `Exception`
+ // (instead of `RuntimeException`) to work well with
those languages
handleDeserializationFailure(
deserializationExceptionHandler,
globalProcessorContext,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 9e2a79f0f25..2bb58eb6b82 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -203,7 +203,10 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
} catch (final FailedProcessingException | TaskCorruptedException |
TaskMigratedException e) {
// Rethrow exceptions that should not be handled here
throw e;
- } catch (final RuntimeException processingException) {
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
final ErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
null, // only required to pass for
DeserializationExceptionHandler
internalProcessorContext.topic(),
@@ -220,7 +223,10 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
processingExceptionHandler.handle(errorHandlerContext,
record, processingException),
"Invalid ProductionExceptionHandler response."
);
- } catch (final RuntimeException fatalUserException) {
+ } catch (final Exception fatalUserException) {
+ // while Java distinguishes checked vs unchecked exceptions,
other languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
log.error(
"Processing error callback failed after processing error
for record: {}",
errorHandlerContext,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 8dedeb7dad4..81db581d04f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -210,7 +210,10 @@ public class RecordCollectorImpl implements
RecordCollector {
key,
keySerializer,
exception);
- } catch (final RuntimeException serializationException) {
+ } catch (final Exception serializationException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
handleException(
ProductionExceptionHandler.SerializationExceptionOrigin.KEY,
topic,
@@ -221,7 +224,8 @@ public class RecordCollectorImpl implements RecordCollector
{
timestamp,
processorNodeId,
context,
- serializationException);
+ serializationException
+ );
return;
}
@@ -234,7 +238,10 @@ public class RecordCollectorImpl implements
RecordCollector {
value,
valueSerializer,
exception);
- } catch (final RuntimeException serializationException) {
+ } catch (final Exception serializationException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
handleException(
ProductionExceptionHandler.SerializationExceptionOrigin.VALUE,
topic,
@@ -312,7 +319,7 @@ public class RecordCollectorImpl implements RecordCollector
{
final Long timestamp,
final String processorNodeId,
final InternalProcessorContext<Void,
Void> context,
- final RuntimeException
serializationException) {
+ final Exception
serializationException) {
log.debug(String.format("Error serializing record for topic %s",
topic), serializationException);
final ProducerRecord<K, V> record = new ProducerRecord<>(topic,
partition, timestamp, key, value, headers);
@@ -328,7 +335,10 @@ public class RecordCollectorImpl implements
RecordCollector {
),
"Invalid ProductionExceptionHandler response."
);
- } catch (final RuntimeException fatalUserException) {
+ } catch (final Exception fatalUserException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
log.error(
String.format(
"Production error callback failed after serialization
error for record %s: %s",
@@ -442,7 +452,10 @@ public class RecordCollectorImpl implements
RecordCollector {
),
"Invalid ProductionExceptionHandler response."
);
- } catch (final RuntimeException fatalUserException) {
+ } catch (final Exception fatalUserException) {
+ // while Java distinguishes checked vs unchecked exceptions,
other languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
log.error(
"Production error callback failed after production error
for record {}",
serializedRecord,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 5ddafe654e9..6f9fe989552 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -70,7 +70,10 @@ public class RecordDeserializer {
rawRecord.headers(),
rawRecord.leaderEpoch()
);
- } catch (final RuntimeException deserializationException) {
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
handleDeserializationFailure(deserializationExceptionHandler,
processorContext, deserializationException, rawRecord, log,
droppedRecordsSensor, sourceNode().name());
return null; // 'handleDeserializationFailure' would either throw
or swallow -- if we swallow we need to skip the record by returning 'null'
}
@@ -78,7 +81,7 @@ public class RecordDeserializer {
public static void handleDeserializationFailure(final
DeserializationExceptionHandler deserializationExceptionHandler,
final ProcessorContext<?,
?> processorContext,
- final RuntimeException
deserializationException,
+ final Exception
deserializationException,
final
ConsumerRecord<byte[], byte[]> rawRecord,
final Logger log,
final Sensor
droppedRecordsSensor,
@@ -100,7 +103,10 @@ public class RecordDeserializer {
deserializationExceptionHandler.handle(errorHandlerContext,
rawRecord, deserializationException),
"Invalid DeserializationExceptionHandler response."
);
- } catch (final RuntimeException fatalUserException) {
+ } catch (final Exception fatalUserException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
log.error(
"Deserialization error callback failed after deserialization
error for record {}",
rawRecord,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 92772c686af..1fdd298088b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -52,8 +52,6 @@ import
org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -883,18 +881,6 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
processTimeMs = 0L;
}
- private String getStacktraceString(final Throwable e) {
- String stacktrace = null;
- try (final StringWriter stringWriter = new StringWriter();
- final PrintWriter printWriter = new PrintWriter(stringWriter)) {
- e.printStackTrace(printWriter);
- stacktrace = stringWriter.toString();
- } catch (final IOException ioe) {
- log.error("Encountered error extracting stacktrace from this
exception", ioe);
- }
- return stacktrace;
- }
-
/**
* @throws IllegalStateException if the current node is not null
* @throws TaskMigratedException if the task producer got fenced (EOS only)
@@ -937,7 +923,10 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
throw createStreamsException(node.name(), e.getCause());
} catch (final TaskCorruptedException | TaskMigratedException e) {
throw e;
- } catch (final RuntimeException processingException) {
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked exceptions, other
languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
final ErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
null,
recordContext.topic(),
@@ -955,7 +944,10 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
processingExceptionHandler.handle(errorHandlerContext,
null, processingException),
"Invalid ProcessingExceptionHandler response."
);
- } catch (final RuntimeException fatalUserException) {
+ } catch (final Exception fatalUserException) {
+ // while Java distinguishes checked vs unchecked exceptions,
other languages
+ // like Scala or Kotlin do not, and thus we need to catch
`Exception`
+ // (instead of `RuntimeException`) to work well with those
languages
log.error(
"Processing error callback failed after processing error
for record: {}",
errorHandlerContext,