This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new ccb04acb56b Revert "KAFKA-16508: Streams custom handler should handle
the timeout exceptions (#16450)" (#16738)
ccb04acb56b is described below
commit ccb04acb56b5a29f6de4dfc7b98897404eb14c0c
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jul 31 10:29:02 2024 -0700
Revert "KAFKA-16508: Streams custom handler should handle the timeout
exceptions (#16450)" (#16738)
This reverts commit 15a4501bded513822485dd85fa6258e16f1571ca.
We consider this change backward incompatible and will fix forward for 4.0
release via KIP-1065, but need to revert for 3.9 release.
Reviewers: Josep Prat <[email protected]>, Bill Bejeck <[email protected]>
---
.../processor/internals/RecordCollectorImpl.java | 13 +-
.../integration/CustomHandlerIntegrationTest.java | 166 ---------------------
.../processor/internals/RecordCollectorTest.java | 50 -------
3 files changed, 1 insertion(+), 228 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index de4afc2c924..42b8d4f082b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -34,7 +34,6 @@ import
org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
@@ -389,7 +388,7 @@ public class RecordCollectorImpl implements RecordCollector
{
"indicating the task may be migrated out";
sendException.set(new TaskMigratedException(errorMessage,
exception));
} else {
- if (isRetriable(exception)) {
+ if (exception instanceof RetriableException) {
errorMessage += "\nThe broker is either slow or in bad state
(like not having enough replicas) in responding the request, " +
"or the connection to broker was interrupted sending the
request or receiving the response. " +
"\nConsider overwriting `max.block.ms` and /or " +
@@ -419,16 +418,6 @@ public class RecordCollectorImpl implements
RecordCollector {
log.error(errorMessage, exception);
}
- /**
- * The `TimeoutException` with root cause
`UnknownTopicOrPartitionException` is considered as non-retriable
- * (despite `TimeoutException` being a subclass of `RetriableException`,
this particular case is explicitly excluded).
- */
- private boolean isRetriable(final Exception exception) {
- return exception instanceof RetriableException &&
- (!(exception instanceof TimeoutException) ||
exception.getCause() == null
- || !(exception.getCause() instanceof
UnknownTopicOrPartitionException));
- }
-
private boolean isFatalException(final Exception exception) {
final boolean securityException = exception instanceof
AuthenticationException ||
exception instanceof AuthorizationException ||
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
deleted file mode 100644
index 3eea2ec7d84..00000000000
---
a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KafkaStreams.State;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.test.TestUtils;
-
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.api.Timeout;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-
-
-@Timeout(600)
-@Tag("integration")
-public class CustomHandlerIntegrationTest {
- private static final int NUM_BROKERS = 1;
- private static final int NUM_THREADS = 2;
- public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS,
-
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable",
"false")));
-
- @BeforeAll
- public static void startCluster() throws IOException {
- CLUSTER.start();
- }
-
- @AfterAll
- public static void closeCluster() {
- CLUSTER.stop();
- }
-
- private final long timeout = 60000;
-
- // topic name
- private static final String STREAM_INPUT = "STREAM_INPUT";
- private static final String NON_EXISTING_TOPIC = "non_existing_topic";
-
- private KafkaStreams kafkaStreams;
- AtomicReference<Throwable> caughtException;
- Topology topology;
- private String appId;
-
- @BeforeEach
- public void before(final TestInfo testInfo) throws InterruptedException {
- final StreamsBuilder builder = new StreamsBuilder();
- CLUSTER.createTopics(STREAM_INPUT);
- caughtException = new AtomicReference<>();
- final String safeTestName = safeUniqueTestName(testInfo);
- appId = "app-" + safeTestName;
-
-
- builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
- .to(NON_EXISTING_TOPIC, Produced.with(Serdes.Integer(),
Serdes.String()));
- produceRecords();
- topology = builder.build();
- }
-
- @AfterEach
- public void after() throws InterruptedException {
- CLUSTER.deleteTopics(STREAM_INPUT);
- if (kafkaStreams != null) {
- kafkaStreams.close();
- kafkaStreams.cleanUp();
- }
- }
-
- private void produceRecords() {
- final Properties props = TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties());
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- STREAM_INPUT,
- Collections.singletonList(new KeyValue<>(1, "A")),
- props,
- CLUSTER.time.milliseconds() + 2
- );
- }
-
- private Properties getCommonProperties() {
- final Properties streamsConfiguration = new Properties();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
- streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class);
-
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
- streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
NUM_THREADS);
- return streamsConfiguration;
- }
-
- private void closeApplication(final Properties streamsConfiguration)
throws Exception {
- kafkaStreams.close();
- kafkaStreams.cleanUp();
- IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
- }
-
- @Test
- public void
shouldThrowStreamsExceptionWithMissingTopicAndDefaultExceptionHandler() throws
Exception {
- final Properties streamsConfiguration = getCommonProperties();
- kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
- kafkaStreams.setUncaughtExceptionHandler(e -> {
- caughtException.set(e);
- return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
- });
- kafkaStreams.start();
- TestUtils.waitForCondition(
- () -> kafkaStreams.state() == State.RUNNING,
- timeout,
- () -> "Kafka Streams application did not reach state RUNNING
in " + timeout + " ms");
- while (true) {
- if (caughtException.get() != null) {
- final Throwable throwable = caughtException.get();
- assertInstanceOf(StreamsException.class, throwable);
- assertInstanceOf(TimeoutException.class, throwable.getCause());
- assertInstanceOf(UnknownTopicOrPartitionException.class,
throwable.getCause().getCause());
- closeApplication(streamsConfiguration);
- break;
- } else {
- Thread.sleep(100);
- }
- }
- }
-}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 26231028851..735a7b7910b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -34,7 +34,6 @@ import
org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
@@ -93,7 +92,6 @@ import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -1359,54 +1357,6 @@ public class RecordCollectorTest {
collector.closeClean();
}
- @Test
- public void
shouldThrowStreamsExceptionOnUnknownTopicOrPartitionExceptionWithDefaultExceptionHandler()
{
- final KafkaException exception = new TimeoutException("KABOOM!", new
UnknownTopicOrPartitionException());
- final RecordCollector collector = new RecordCollectorImpl(
- logContext,
- taskId,
- getExceptionalStreamsProducerOnSend(exception),
- productionExceptionHandler,
- streamsMetrics,
- topology
- );
-
- collector.send(topic, "3", "0", null, null, stringSerializer,
stringSerializer, sinkNodeName, context, streamPartitioner);
-
- // With default handler which returns FAIL, flush() throws
StreamsException with TimeoutException cause,
- // otherwise it would throw a TaskCorruptedException with null cause
- final StreamsException thrown = assertThrows(StreamsException.class,
collector::flush);
- assertEquals(exception, thrown.getCause());
- assertThat(
- thrown.getMessage(),
- equalTo("Error encountered sending record to topic topic for
task 0_0 due to:" +
- "\norg.apache.kafka.common.errors.TimeoutException:
KABOOM!" +
- "\nException handler choose to FAIL the processing, no
more records would be sent.")
- );
- }
-
- @Test
- public void
shouldNotThrowTaskCorruptedExceptionOnUnknownTopicOrPartitionExceptionUsingAlwaysContinueExceptionHandler()
{
- final KafkaException exception = new TimeoutException("KABOOM!", new
UnknownTopicOrPartitionException());
- final RecordCollector collector = new RecordCollectorImpl(
- logContext,
- taskId,
- getExceptionalStreamsProducerOnSend(exception),
- new ProductionExceptionHandlerMock(
-
ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE,
- context,
- sinkNodeName,
- taskId
- ),
- streamsMetrics,
- topology
- );
-
- collector.send(topic, "3", "0", null, null, stringSerializer,
stringSerializer, sinkNodeName, context, streamPartitioner);
-
- assertDoesNotThrow(collector::flush);
- }
-
@Test
public void shouldNotAbortTxnOnEOSCloseDirtyIfNothingSent() {
final AtomicBoolean functionCalled = new AtomicBoolean(false);