This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fb30362205 Fix multi-topic ingestion halt when one Kafka topic is 
deleted (#18560)
8fb30362205 is described below

commit 8fb30362205fa382eb1677e15b0c7788d491f0e1
Author: Rekha Seethamraju <[email protected]>
AuthorDate: Wed May 27 01:30:55 2026 -0700

    Fix multi-topic ingestion halt when one Kafka topic is deleted (#18560)
    
    * Fix multi-topic ingestion halt when one Kafka topic is deleted
    
    In a multi-topic table, if one Kafka topic becomes inaccessible (e.g.
    deleted externally while a backfill topic is still registered in the
    table config), PartitionGroupMetadataFetcher.fetchMultipleStreams() was
    re-throwing the exception from computePartitionGroupMetadata(). This
    caused the entire getStreamMetadataList() call to fail, which in turn
    caused RealtimeSegmentValidationManager.ensureAllPartitionsConsuming()
    to throw, halting ingestion repair for all topics on the table — not
    just the inaccessible one.
    
    Fix: catch permanent (non-transient) exceptions per-topic in
    fetchMultipleStreams(), log and record the failed topic name in
    _failedTopics, and continue fetching the remaining topics. The caller
    (PinotTableIdealStateBuilder) reads getFailedTopics() after the fetch
    and emits a PARTITION_GROUP_METADATA_FETCH_ERROR metric tagged with the
    topic name for each failure, so operators can detect the inaccessible
    topic. Healthy topics proceed normally through 
ensureAllPartitionsConsuming().
    
    TransientConsumerException handling is unchanged — transient failures
    still return false to trigger a retry via the retry policy.
    
    * Removed failed topics since we don't need to store it anymore
    
    * cleaning up failed topics
---
 .../kafka30/KafkaStreamMetadataProvider.java       |   3 +-
 .../kafka40/KafkaStreamMetadataProvider.java       |   3 +-
 .../spi/stream/PartitionGroupMetadataFetcher.java  |   6 +
 .../stream/PartitionGroupMetadataFetcherTest.java  | 128 +++++++++++++++++++++
 4 files changed, 138 insertions(+), 2 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index 18dcd125eab..ebc0a044ccd 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.stream.PermanentConsumerException;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -384,7 +385,7 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
 
     if (lastError != null) {
       if (topicMissing) {
-        throw new RuntimeException("Topic does not exist: " + _topic);
+        throw new PermanentConsumerException(new RuntimeException("Topic does 
not exist: " + _topic));
       }
       if (lastError instanceof TransientConsumerException) {
         throw (TransientConsumerException) lastError;
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
index bd712ac6fb6..4a9ff545b67 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.stream.PermanentConsumerException;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -382,7 +383,7 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
 
     if (lastError != null) {
       if (topicMissing) {
-        throw new RuntimeException("Topic does not exist: " + _topic);
+        throw new PermanentConsumerException(new RuntimeException("Topic does 
not exist: " + _topic));
       }
       if (lastError instanceof TransientConsumerException) {
         throw (TransientConsumerException) lastError;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 2e0443228d6..edf6d6ccdd4 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -151,6 +151,12 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
         LOGGER.warn("Transient Exception: Could not get StreamMetadata for 
topic {}", topicName, e);
         _exception = e;
         return Boolean.FALSE;
+      } catch (PermanentConsumerException e) {
+        // A confirmed-permanent failure (e.g. topic deleted from Kafka) must 
not block metadata
+        // fetching for the remaining healthy topics in a multi-topic table. 
Log, record, and
+        // continue — the caller emits a metric and healthy topics proceed 
normally.
+        LOGGER.warn("Permanent failure fetching StreamMetadata for topic {}, 
skipping in multi-topic fetch",
+            topicName, e);
       } catch (Exception e) {
         LOGGER.warn("Could not get StreamMetadata for topic {}", topicName, e);
         _exception = e;
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
index 1c229b602da..3a175ef7fcd 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
@@ -366,6 +366,134 @@ public class PartitionGroupMetadataFetcherTest {
     }
   }
 
+  /**
+   * When one topic in a multi-topic table is inaccessible (e.g. deleted from 
Kafka), the fetcher must
+   * continue fetching metadata for the remaining topics and return partial 
results rather than re-throwing
+   * and killing ingestion for all healthy topics.
+   */
+  @Test
+  public void testFetchMultipleStreamsOneTopicPermanentFailure()
+      throws Exception {
+    StreamConfig streamConfig1 = createMockStreamConfig("topic1", 
"test-table_REALTIME", false);
+    StreamConfig streamConfig2 = createMockStreamConfig("topic2-deleted", 
"test-table_REALTIME", false);
+    List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1, 
streamConfig2);
+
+    StreamPartitionMsgOffset offset = mock(StreamPartitionMsgOffset.class);
+    PartitionGroupMetadata metadata = new PartitionGroupMetadata(0, offset);
+
+    // topic1 succeeds, topic2 throws a permanent (non-transient) exception
+    StreamMetadataProvider goodProvider = mock(StreamMetadataProvider.class);
+    when(goodProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
+        any(List.class), anyInt(), 
anyBoolean())).thenReturn(Collections.singletonList(metadata));
+
+    StreamMetadataProvider badProvider = mock(StreamMetadataProvider.class);
+    when(badProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
+        any(List.class), anyInt(), anyBoolean()))
+        .thenThrow(new PermanentConsumerException(new RuntimeException("Topic 
does not exist")));
+
+    StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+    when(factory.createStreamMetadataProvider(anyString()))
+        .thenReturn(goodProvider)   // called for topic1
+        .thenReturn(badProvider);   // called for topic2
+
+    try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider = 
Mockito.mockStatic(
+        StreamConsumerFactoryProvider.class)) {
+      mockedProvider.when(() -> 
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+      PartitionGroupMetadataFetcher fetcher = new 
PartitionGroupMetadataFetcher(
+          streamConfigs, Collections.emptyList(), Collections.emptyList(), 
false);
+
+      Boolean result = fetcher.call();
+
+      // Fetch succeeds overall — only topic1's metadata is returned, topic2 
is silently skipped
+      Assert.assertTrue(result);
+      Assert.assertEquals(fetcher.getStreamMetadataList().size(), 1);
+      
Assert.assertEquals(fetcher.getStreamMetadataList().get(0).getNumPartitions(), 
1);
+      Assert.assertNull(fetcher.getException());
+    }
+  }
+
+  @Test
+  public void testFetchMultipleStreamsFailedTopicDoesNotBlockOthersOnRetry()
+      throws Exception {
+    StreamConfig streamConfig1 = createMockStreamConfig("topic1", 
"test-table_REALTIME", false);
+    StreamConfig streamConfig2 = createMockStreamConfig("topic2", 
"test-table_REALTIME", false);
+    List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1, 
streamConfig2);
+
+    StreamPartitionMsgOffset offset = mock(StreamPartitionMsgOffset.class);
+    PartitionGroupMetadata metadata = new PartitionGroupMetadata(0, offset);
+
+    // First call: topic2 throws. Second call: both succeed.
+    StreamMetadataProvider provider = mock(StreamMetadataProvider.class);
+    when(provider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
+        any(List.class), anyInt(), anyBoolean()))
+        .thenReturn(Collections.singletonList(metadata))           // topic1 
first call
+        .thenThrow(new PermanentConsumerException(new RuntimeException("Topic 
does not exist")))   // topic2 first call
+        .thenReturn(Collections.singletonList(metadata))           // topic1 
second call
+        .thenReturn(Collections.singletonList(metadata));          // topic2 
second call
+
+    StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+    
when(factory.createStreamMetadataProvider(anyString())).thenReturn(provider);
+
+    try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider = 
Mockito.mockStatic(
+        StreamConsumerFactoryProvider.class)) {
+      mockedProvider.when(() -> 
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+      PartitionGroupMetadataFetcher fetcher = new 
PartitionGroupMetadataFetcher(
+          streamConfigs, Collections.emptyList(), Collections.emptyList(), 
false);
+
+      // First call: topic2 fails — only topic1's metadata returned, no 
exception
+      Boolean result1 = fetcher.call();
+      Assert.assertTrue(result1);
+      Assert.assertEquals(fetcher.getStreamMetadataList().size(), 1);
+      Assert.assertNull(fetcher.getException());
+
+      // Second call: both succeed
+      Boolean result2 = fetcher.call();
+      Assert.assertTrue(result2);
+      Assert.assertEquals(fetcher.getStreamMetadataList().size(), 2);
+    }
+  }
+
+  @Test
+  public void testFetchMultipleStreamsNonPermanentExceptionStillPropagates()
+      throws Exception {
+    StreamConfig streamConfig1 = createMockStreamConfig("topic1", 
"test-table_REALTIME", false);
+    StreamConfig streamConfig2 = createMockStreamConfig("topic2", 
"test-table_REALTIME", false);
+    List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1, 
streamConfig2);
+
+    StreamMetadataProvider goodProvider = mock(StreamMetadataProvider.class);
+    when(goodProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
+        any(List.class), anyInt(), anyBoolean()))
+        .thenReturn(Collections.singletonList(new PartitionGroupMetadata(0, 
mock(StreamPartitionMsgOffset.class))));
+
+    // Generic RuntimeException (not PermanentConsumerException) — auth error, 
NPE, etc.
+    StreamMetadataProvider badProvider = mock(StreamMetadataProvider.class);
+    when(badProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
+        any(List.class), anyInt(), anyBoolean()))
+        .thenThrow(new RuntimeException("Auth failure"));
+
+    StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+    when(factory.createStreamMetadataProvider(anyString()))
+        .thenReturn(goodProvider)
+        .thenReturn(badProvider);
+
+    try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider = 
Mockito.mockStatic(
+        StreamConsumerFactoryProvider.class)) {
+      mockedProvider.when(() -> 
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+      PartitionGroupMetadataFetcher fetcher = new 
PartitionGroupMetadataFetcher(
+          streamConfigs, Collections.emptyList(), Collections.emptyList(), 
false);
+
+      try {
+        fetcher.call();
+        Assert.fail("Expected RuntimeException to propagate");
+      } catch (RuntimeException e) {
+        Assert.assertEquals(e.getMessage(), "Auth failure");
+      }
+    }
+  }
+
   private StreamConfig createMockStreamConfig(String topicName, String 
tableName, boolean isEphemeral) {
     StreamConfig streamConfig = mock(StreamConfig.class);
     when(streamConfig.getTopicName()).thenReturn(topicName);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to