This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new ccb04acb56b Revert "KAFKA-16508: Streams custom handler should handle 
the timeout exceptions (#16450)" (#16738)
ccb04acb56b is described below

commit ccb04acb56b5a29f6de4dfc7b98897404eb14c0c
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jul 31 10:29:02 2024 -0700

    Revert "KAFKA-16508: Streams custom handler should handle the timeout 
exceptions (#16450)" (#16738)
    
    This reverts commit 15a4501bded513822485dd85fa6258e16f1571ca.
    
    We consider this change backward incompatible and will fix forward for 4.0
    release via KIP-1065, but need to revert for 3.9 release.
    
    Reviewers: Josep Prat <[email protected]>, Bill Bejeck <[email protected]>
---
 .../processor/internals/RecordCollectorImpl.java   |  13 +-
 .../integration/CustomHandlerIntegrationTest.java  | 166 ---------------------
 .../processor/internals/RecordCollectorTest.java   |  50 -------
 3 files changed, 1 insertion(+), 228 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index de4afc2c924..42b8d4f082b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -34,7 +34,6 @@ import 
org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serializer;
@@ -389,7 +388,7 @@ public class RecordCollectorImpl implements RecordCollector 
{
                 "indicating the task may be migrated out";
             sendException.set(new TaskMigratedException(errorMessage, 
exception));
         } else {
-            if (isRetriable(exception)) {
+            if (exception instanceof RetriableException) {
                 errorMessage += "\nThe broker is either slow or in bad state 
(like not having enough replicas) in responding the request, " +
                     "or the connection to broker was interrupted sending the 
request or receiving the response. " +
                     "\nConsider overwriting `max.block.ms` and /or " +
@@ -419,16 +418,6 @@ public class RecordCollectorImpl implements 
RecordCollector {
         log.error(errorMessage, exception);
     }
 
-    /**
-     * The `TimeoutException` with root cause 
`UnknownTopicOrPartitionException` is considered as non-retriable
-     * (despite `TimeoutException` being a subclass of `RetriableException`, 
this particular case is explicitly excluded).
-    */
-    private boolean isRetriable(final Exception exception) {
-        return exception instanceof RetriableException &&
-                (!(exception instanceof TimeoutException) || 
exception.getCause() == null
-                        || !(exception.getCause() instanceof 
UnknownTopicOrPartitionException));
-    }
-
     private boolean isFatalException(final Exception exception) {
         final boolean securityException = exception instanceof 
AuthenticationException ||
             exception instanceof AuthorizationException ||
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
deleted file mode 100644
index 3eea2ec7d84..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KafkaStreams.State;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.test.TestUtils;
-
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.api.Timeout;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-
-
-@Timeout(600)
-@Tag("integration")
-public class CustomHandlerIntegrationTest {
-    private static final int NUM_BROKERS = 1;
-    private static final int NUM_THREADS = 2;
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS,
-            
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", 
"false")));
-
-    @BeforeAll
-    public static void startCluster() throws IOException {
-        CLUSTER.start();
-    }
-
-    @AfterAll
-    public static void closeCluster() {
-        CLUSTER.stop();
-    }
-
-    private final long timeout = 60000;
-
-    // topic name
-    private static final String STREAM_INPUT = "STREAM_INPUT";
-    private static final String NON_EXISTING_TOPIC = "non_existing_topic";
-
-    private KafkaStreams kafkaStreams;
-    AtomicReference<Throwable> caughtException;
-    Topology topology;
-    private String appId;
-
-    @BeforeEach
-    public void before(final TestInfo testInfo) throws InterruptedException {
-        final StreamsBuilder builder = new StreamsBuilder();
-        CLUSTER.createTopics(STREAM_INPUT);
-        caughtException = new AtomicReference<>();
-        final String safeTestName = safeUniqueTestName(testInfo);
-        appId = "app-" + safeTestName;
-
-
-        builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()))
-                .to(NON_EXISTING_TOPIC, Produced.with(Serdes.Integer(), 
Serdes.String()));
-        produceRecords();
-        topology = builder.build();
-    }
-
-    @AfterEach
-    public void after() throws InterruptedException {
-        CLUSTER.deleteTopics(STREAM_INPUT);
-        if (kafkaStreams != null) {
-            kafkaStreams.close();
-            kafkaStreams.cleanUp();
-        }
-    }
-
-    private void produceRecords() {
-        final Properties props = TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                IntegerSerializer.class,
-                StringSerializer.class,
-                new Properties());
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                STREAM_INPUT,
-                Collections.singletonList(new KeyValue<>(1, "A")),
-                props,
-                CLUSTER.time.milliseconds() + 2
-        );
-    }
-
-    private Properties getCommonProperties() {
-        final Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class);
-        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
-        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
NUM_THREADS);
-        return streamsConfiguration;
-    }
-
-    private void closeApplication(final Properties streamsConfiguration) 
throws Exception {
-        kafkaStreams.close();
-        kafkaStreams.cleanUp();
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-    }
-
-    @Test
-    public void 
shouldThrowStreamsExceptionWithMissingTopicAndDefaultExceptionHandler() throws 
Exception {
-        final Properties streamsConfiguration = getCommonProperties();
-        kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
-        kafkaStreams.setUncaughtExceptionHandler(e -> {
-            caughtException.set(e);
-            return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
-        });
-        kafkaStreams.start();
-        TestUtils.waitForCondition(
-                () -> kafkaStreams.state() == State.RUNNING,
-                timeout,
-                () -> "Kafka Streams application did not reach state RUNNING 
in " + timeout + " ms");
-        while (true) {
-            if (caughtException.get() != null) {
-                final Throwable throwable = caughtException.get();
-                assertInstanceOf(StreamsException.class, throwable);
-                assertInstanceOf(TimeoutException.class, throwable.getCause());
-                assertInstanceOf(UnknownTopicOrPartitionException.class, 
throwable.getCause().getCause());
-                closeApplication(streamsConfiguration);
-                break;
-            } else {
-                Thread.sleep(100);
-            }
-        }
-    }
-}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 26231028851..735a7b7910b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -34,7 +34,6 @@ import 
org.apache.kafka.common.errors.InvalidProducerEpochException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeader;
@@ -93,7 +92,6 @@ import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -1359,54 +1357,6 @@ public class RecordCollectorTest {
         collector.closeClean();
     }
 
-    @Test
-    public void 
shouldThrowStreamsExceptionOnUnknownTopicOrPartitionExceptionWithDefaultExceptionHandler()
 {
-        final KafkaException exception = new TimeoutException("KABOOM!", new 
UnknownTopicOrPartitionException());
-        final RecordCollector collector = new RecordCollectorImpl(
-                logContext,
-                taskId,
-                getExceptionalStreamsProducerOnSend(exception),
-                productionExceptionHandler,
-                streamsMetrics,
-                topology
-        );
-
-        collector.send(topic, "3", "0", null, null, stringSerializer, 
stringSerializer, sinkNodeName, context, streamPartitioner);
-
-        // With default handler which returns FAIL, flush() throws 
StreamsException with TimeoutException cause,
-        // otherwise it would throw a TaskCorruptedException with null cause
-        final StreamsException thrown = assertThrows(StreamsException.class, 
collector::flush);
-        assertEquals(exception, thrown.getCause());
-        assertThat(
-                thrown.getMessage(),
-                equalTo("Error encountered sending record to topic topic for 
task 0_0 due to:" +
-                        "\norg.apache.kafka.common.errors.TimeoutException: 
KABOOM!" +
-                        "\nException handler choose to FAIL the processing, no 
more records would be sent.")
-        );
-    }
-
-    @Test
-    public void 
shouldNotThrowTaskCorruptedExceptionOnUnknownTopicOrPartitionExceptionUsingAlwaysContinueExceptionHandler()
 {
-        final KafkaException exception = new TimeoutException("KABOOM!", new 
UnknownTopicOrPartitionException());
-        final RecordCollector collector = new RecordCollectorImpl(
-                logContext,
-                taskId,
-                getExceptionalStreamsProducerOnSend(exception),
-                new ProductionExceptionHandlerMock(
-                    
ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE,
-                    context,
-                    sinkNodeName,
-                    taskId
-                ),
-                streamsMetrics,
-                topology
-        );
-
-        collector.send(topic, "3", "0", null, null, stringSerializer, 
stringSerializer, sinkNodeName, context, streamPartitioner);
-
-        assertDoesNotThrow(collector::flush);
-    }
-
     @Test
     public void shouldNotAbortTxnOnEOSCloseDirtyIfNothingSent() {
         final AtomicBoolean functionCalled = new AtomicBoolean(false);

Reply via email to