This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 2127ae63290 KAFKA-17994 Checked exceptions are not handled (#17817)
2127ae63290 is described below

commit 2127ae63290e1309e0d1ebbc84f6ef3a6f81bae2
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Nov 15 04:36:03 2024 -0800

    KAFKA-17994 Checked exceptions are not handled (#17817)
    
    Reviewers: Bill Bejeck <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../internals/GlobalStateManagerImpl.java          |  5 ++++-
 .../streams/processor/internals/ProcessorNode.java | 10 +++++++--
 .../processor/internals/RecordCollectorImpl.java   | 25 ++++++++++++++++------
 .../processor/internals/RecordDeserializer.java    | 12 ++++++++---
 .../streams/processor/internals/StreamTask.java    | 24 +++++++--------------
 5 files changed, 48 insertions(+), 28 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 12d4c6c603d..24cf51a67be 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -319,7 +319,10 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
                                 record.headers()));
                             restoreCount++;
                         }
-                    } catch (final RuntimeException deserializationException) {
+                    } catch (final Exception deserializationException) {
+                        // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                        // like Scala or Kotlin do not, and thus we need to 
catch `Exception`
+                        // (instead of `RuntimeException`) to work well with 
those languages
                         handleDeserializationFailure(
                             deserializationExceptionHandler,
                             globalProcessorContext,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 2b945f2da29..c1f9c423049 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -203,7 +203,10 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
         } catch (final FailedProcessingException | TaskCorruptedException | 
TaskMigratedException e) {
             // Rethrow exceptions that should not be handled here
             throw e;
-        } catch (final RuntimeException processingException) {
+        } catch (final Exception processingException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             final ErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
                 null, // only required to pass for 
DeserializationExceptionHandler
                 internalProcessorContext.topic(),
@@ -220,7 +223,10 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
                     processingExceptionHandler.handle(errorHandlerContext, 
record, processingException),
                     "Invalid ProductionExceptionHandler response."
                 );
-            } catch (final RuntimeException fatalUserException) {
+            } catch (final Exception fatalUserException) {
+                // while Java distinguishes checked vs unchecked exceptions, 
other languages
+                // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+                // (instead of `RuntimeException`) to work well with those 
languages
                 log.error(
                     "Processing error callback failed after processing error 
for record: {}",
                     errorHandlerContext,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index b725feb4efa..49878b2e883 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -211,7 +211,10 @@ public class RecordCollectorImpl implements 
RecordCollector {
                 key,
                 keySerializer,
                 exception);
-        } catch (final RuntimeException serializationException) {
+        } catch (final Exception serializationException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             handleException(
                 ProductionExceptionHandler.SerializationExceptionOrigin.KEY,
                 topic,
@@ -222,7 +225,8 @@ public class RecordCollectorImpl implements RecordCollector 
{
                 timestamp,
                 processorNodeId,
                 context,
-                serializationException);
+                serializationException
+            );
             return;
         }
 
@@ -235,7 +239,10 @@ public class RecordCollectorImpl implements 
RecordCollector {
                 value,
                 valueSerializer,
                 exception);
-        } catch (final RuntimeException serializationException) {
+        } catch (final Exception serializationException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             handleException(
                 ProductionExceptionHandler.SerializationExceptionOrigin.VALUE,
                 topic,
@@ -313,7 +320,7 @@ public class RecordCollectorImpl implements RecordCollector 
{
                                         final Long timestamp,
                                         final String processorNodeId,
                                         final InternalProcessorContext<Void, 
Void> context,
-                                        final RuntimeException 
serializationException) {
+                                        final Exception 
serializationException) {
         log.debug(String.format("Error serializing record for topic %s", 
topic), serializationException);
 
         final ProducerRecord<K, V> record = new ProducerRecord<>(topic, 
partition, timestamp, key, value, headers);
@@ -329,7 +336,10 @@ public class RecordCollectorImpl implements 
RecordCollector {
                 ),
                 "Invalid ProductionExceptionHandler response."
             );
-        } catch (final RuntimeException fatalUserException) {
+        } catch (final Exception fatalUserException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             log.error(
                 String.format(
                     "Production error callback failed after serialization 
error for record %s: %s",
@@ -446,7 +456,10 @@ public class RecordCollectorImpl implements 
RecordCollector {
                         ),
                         "Invalid ProductionExceptionHandler response."
                     );
-                } catch (final RuntimeException fatalUserException) {
+                } catch (final Exception fatalUserException) {
+                    // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                    // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+                    // (instead of `RuntimeException`) to work well with those 
languages
                     log.error(
                         "Production error callback failed after production 
error for record {}",
                         serializedRecord,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index aefa15da660..a4179509c6a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -71,7 +71,10 @@ public class RecordDeserializer {
                 rawRecord.headers(),
                 Optional.empty()
             );
-        } catch (final RuntimeException deserializationException) {
+        } catch (final Exception deserializationException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             handleDeserializationFailure(deserializationExceptionHandler, 
processorContext, deserializationException, rawRecord, log, 
droppedRecordsSensor, sourceNode().name());
             return null; //  'handleDeserializationFailure' would either throw 
or swallow -- if we swallow we need to skip the record by returning 'null'
         }
@@ -79,7 +82,7 @@ public class RecordDeserializer {
 
     public static void handleDeserializationFailure(final 
DeserializationExceptionHandler deserializationExceptionHandler,
                                                     final ProcessorContext<?, 
?> processorContext,
-                                                    final RuntimeException 
deserializationException,
+                                                    final Exception 
deserializationException,
                                                     final 
ConsumerRecord<byte[], byte[]> rawRecord,
                                                     final Logger log,
                                                     final Sensor 
droppedRecordsSensor,
@@ -101,7 +104,10 @@ public class RecordDeserializer {
                 deserializationExceptionHandler.handle(errorHandlerContext, 
rawRecord, deserializationException),
                 "Invalid DeserializationExceptionHandler response."
             );
-        } catch (final RuntimeException fatalUserException) {
+        } catch (final Exception fatalUserException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             log.error(
                 "Deserialization error callback failed after deserialization 
error for record {}",
                 rawRecord,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 76df4693bf6..59ed6b92cd9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -52,8 +52,6 @@ import 
org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -884,18 +882,6 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         processTimeMs = 0L;
     }
 
-    private String getStacktraceString(final Throwable e) {
-        String stacktrace = null;
-        try (final StringWriter stringWriter = new StringWriter();
-             final PrintWriter printWriter = new PrintWriter(stringWriter)) {
-            e.printStackTrace(printWriter);
-            stacktrace = stringWriter.toString();
-        } catch (final IOException ioe) {
-            log.error("Encountered error extracting stacktrace from this 
exception", ioe);
-        }
-        return stacktrace;
-    }
-
     /**
      * @throws IllegalStateException if the current node is not null
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
@@ -938,7 +924,10 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
             throw createStreamsException(node.name(), e.getCause());
         } catch (final TaskCorruptedException | TaskMigratedException e) {
             throw e;
-        } catch (final RuntimeException processingException) {
+        } catch (final Exception processingException) {
+            // while Java distinguishes checked vs unchecked exceptions, other 
languages
+            // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+            // (instead of `RuntimeException`) to work well with those 
languages
             final ErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
                 null,
                 recordContext.topic(),
@@ -956,7 +945,10 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                     processingExceptionHandler.handle(errorHandlerContext, 
null, processingException),
                     "Invalid ProcessingExceptionHandler response."
                 );
-            } catch (final RuntimeException fatalUserException) {
+            } catch (final Exception fatalUserException) {
+                // while Java distinguishes checked vs unchecked exceptions, 
other languages
+                // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
+                // (instead of `RuntimeException`) to work well with those 
languages
                 log.error(
                     "Processing error callback failed after processing error 
for record: {}",
                     errorHandlerContext,

Reply via email to