This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f02c28b21dc KAFKA-17994 Checked exceptions are not handled (#17817)
f02c28b21dc is described below

commit f02c28b21dc4d89e1d7f2e16f8f9e067a9575e61
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Nov 15 04:36:03 2024 -0800

    KAFKA-17994 Checked exceptions are not handled (#17817)
    
    Reviewers: Bill Bejeck <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../internals/GlobalStateManagerImpl.java          |  5 ++++-
 .../streams/processor/internals/ProcessorNode.java | 10 +++++++--
 .../processor/internals/RecordCollectorImpl.java   | 25 ++++++++++++++++------
 .../processor/internals/RecordDeserializer.java    | 12 ++++++++---
 .../streams/processor/internals/StreamTask.java    | 24 +++++++--------------
 5 files changed, 48 insertions(+), 28 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index ad53634386e..96d9074eff3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -319,7 +319,10 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
                                 record.headers()));
                             restoreCount++;
                         }
-                    } catch (final RuntimeException deserializationException) {
+                    } catch (final Exception deserializationException) {
+                        // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                        // like Scala or Kotlin do not, and thus we need to 
catch `Exception`
+                        // (instead of `RuntimeException`) to work well with 
those languages
                         handleDeserializationFailure(
                             deserializationExceptionHandler,
                             globalProcessorContext,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 9e2a79f0f25..2bb58eb6b82 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -203,7 +203,10 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
         } catch (final FailedProcessingException | TaskCorruptedException | 
TaskMigratedException e) {
             // Rethrow exceptions that should not be handled here
             throw e;
-        } catch (final RuntimeException processingException) {
+        } catch (final Exception processingException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             final ErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
                 null, // only required to pass for 
DeserializationExceptionHandler
                 internalProcessorContext.topic(),
@@ -220,7 +223,10 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
                     processingExceptionHandler.handle(errorHandlerContext, 
record, processingException),
                     "Invalid ProductionExceptionHandler response."
                 );
-            } catch (final RuntimeException fatalUserException) {
+            } catch (final Exception fatalUserException) {
+                // while Java distinguishes checked vs unchecked exceptions, 
other languages
+                // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+                // (instead of `RuntimeException`) to work well with those 
languages
                 log.error(
                     "Processing error callback failed after processing error 
for record: {}",
                     errorHandlerContext,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 8dedeb7dad4..81db581d04f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -210,7 +210,10 @@ public class RecordCollectorImpl implements 
RecordCollector {
                 key,
                 keySerializer,
                 exception);
-        } catch (final RuntimeException serializationException) {
+        } catch (final Exception serializationException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             handleException(
                 ProductionExceptionHandler.SerializationExceptionOrigin.KEY,
                 topic,
@@ -221,7 +224,8 @@ public class RecordCollectorImpl implements RecordCollector 
{
                 timestamp,
                 processorNodeId,
                 context,
-                serializationException);
+                serializationException
+            );
             return;
         }
 
@@ -234,7 +238,10 @@ public class RecordCollectorImpl implements 
RecordCollector {
                 value,
                 valueSerializer,
                 exception);
-        } catch (final RuntimeException serializationException) {
+        } catch (final Exception serializationException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             handleException(
                 ProductionExceptionHandler.SerializationExceptionOrigin.VALUE,
                 topic,
@@ -312,7 +319,7 @@ public class RecordCollectorImpl implements RecordCollector 
{
                                         final Long timestamp,
                                         final String processorNodeId,
                                         final InternalProcessorContext<Void, 
Void> context,
-                                        final RuntimeException 
serializationException) {
+                                        final Exception 
serializationException) {
         log.debug(String.format("Error serializing record for topic %s", 
topic), serializationException);
 
         final ProducerRecord<K, V> record = new ProducerRecord<>(topic, 
partition, timestamp, key, value, headers);
@@ -328,7 +335,10 @@ public class RecordCollectorImpl implements 
RecordCollector {
                 ),
                 "Invalid ProductionExceptionHandler response."
             );
-        } catch (final RuntimeException fatalUserException) {
+        } catch (final Exception fatalUserException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             log.error(
                 String.format(
                     "Production error callback failed after serialization 
error for record %s: %s",
@@ -442,7 +452,10 @@ public class RecordCollectorImpl implements 
RecordCollector {
                     ),
                     "Invalid ProductionExceptionHandler response."
                 );
-            } catch (final RuntimeException fatalUserException) {
+            } catch (final Exception fatalUserException) {
+                // while Java distinguishes checked vs unchecked exceptions, 
other languages
+                // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+                // (instead of `RuntimeException`) to work well with those 
languages
                 log.error(
                     "Production error callback failed after production error 
for record {}",
                     serializedRecord,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 5ddafe654e9..6f9fe989552 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -70,7 +70,10 @@ public class RecordDeserializer {
                 rawRecord.headers(),
                 rawRecord.leaderEpoch()
             );
-        } catch (final RuntimeException deserializationException) {
+        } catch (final Exception deserializationException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             handleDeserializationFailure(deserializationExceptionHandler, 
processorContext, deserializationException, rawRecord, log, 
droppedRecordsSensor, sourceNode().name());
             return null; //  'handleDeserializationFailure' would either throw 
or swallow -- if we swallow we need to skip the record by returning 'null'
         }
@@ -78,7 +81,7 @@ public class RecordDeserializer {
 
     public static void handleDeserializationFailure(final 
DeserializationExceptionHandler deserializationExceptionHandler,
                                                     final ProcessorContext<?, 
?> processorContext,
-                                                    final RuntimeException 
deserializationException,
+                                                    final Exception 
deserializationException,
                                                     final 
ConsumerRecord<byte[], byte[]> rawRecord,
                                                     final Logger log,
                                                     final Sensor 
droppedRecordsSensor,
@@ -100,7 +103,10 @@ public class RecordDeserializer {
                 deserializationExceptionHandler.handle(errorHandlerContext, 
rawRecord, deserializationException),
                 "Invalid DeserializationExceptionHandler response."
             );
-        } catch (final RuntimeException fatalUserException) {
+        } catch (final Exception fatalUserException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             log.error(
                 "Deserialization error callback failed after deserialization 
error for record {}",
                 rawRecord,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 92772c686af..1fdd298088b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -52,8 +52,6 @@ import 
org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -883,18 +881,6 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         processTimeMs = 0L;
     }
 
-    private String getStacktraceString(final Throwable e) {
-        String stacktrace = null;
-        try (final StringWriter stringWriter = new StringWriter();
-             final PrintWriter printWriter = new PrintWriter(stringWriter)) {
-            e.printStackTrace(printWriter);
-            stacktrace = stringWriter.toString();
-        } catch (final IOException ioe) {
-            log.error("Encountered error extracting stacktrace from this 
exception", ioe);
-        }
-        return stacktrace;
-    }
-
     /**
      * @throws IllegalStateException if the current node is not null
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
@@ -937,7 +923,10 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
             throw createStreamsException(node.name(), e.getCause());
         } catch (final TaskCorruptedException | TaskMigratedException e) {
             throw e;
-        } catch (final RuntimeException processingException) {
+        } catch (final Exception processingException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             final ErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
                 null,
                 recordContext.topic(),
@@ -955,7 +944,10 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                     processingExceptionHandler.handle(errorHandlerContext, 
null, processingException),
                     "Invalid ProcessingExceptionHandler response."
                 );
-            } catch (final RuntimeException fatalUserException) {
+            } catch (final Exception fatalUserException) {
+                // while Java distinguishes checked vs unchecked exceptions, 
other languages
+                // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+                // (instead of `RuntimeException`) to work well with those 
languages
                 log.error(
                     "Processing error callback failed after processing error 
for record: {}",
                     errorHandlerContext,

Reply via email to