This is an automated email from the ASF dual-hosted git repository.

jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e6f930ed9b fix: fix broker segment metadata cache refresh (#19625)
4e6f930ed9b is described below

commit 4e6f930ed9b4b4d4997ad7d01faba4499027fd07
Author: jtuglu1 <[email protected]>
AuthorDate: Wed Jun 24 13:52:38 2026 -0700

    fix: fix broker segment metadata cache refresh (#19625)
    
    Brokers can maintain a schema cache via segment metadata queries. 
Currently, if any of these queries timeout, the remaining queries are aborted 
until the next refresh. If you have a huge datasource delta (think 500k+ 
segments being scanned), such a query can fail/timeout and cause other 
unrelated datasources' broker schema discovery to fail. Without centralized 
schema through coordinator, there is no intra-datasource atomicity guarantee 
w.r.t schema discovery (it is just ASAP), so dec [...]
    
    Introduces segment/schemaCache/refresh/failed metric with a dataSource 
dimension, emitted when a refresh fails. Can alternatively just aggregate and 
emit at the end. Also open to keeping this a warning/error log.
---
 docs/operations/metrics.md                         |   2 +
 .../metadata/AbstractSegmentMetadataCache.java     |  33 +++++-
 .../org/apache/druid/segment/metadata/Metric.java  |   1 +
 .../CoordinatorSegmentMetadataCacheTest.java       | 121 +++++++++++++++++++++
 4 files changed, 156 insertions(+), 1 deletion(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 8d71c6044f8..31c6813be6c 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -74,6 +74,7 @@ Most metric values reset each emission period, as specified 
in `druid.monitoring
 |`segment/metadataCache/sync/time`|Time taken to poll segment metadata from 
the Coordinator and update the segment metadata cache. This metric is emitted 
only if [metadata cache](../configuration/index.md#sql) is enabled on the 
Broker.||Depends on the number of segments.|
 |`segment/schemaCache/refresh/count`|Number of segments refreshed in broker 
segment schema cache.|`dataSource`||
 |`segment/schemaCache/refresh/time`|Time taken to refresh segments in broker 
segment schema cache.|`dataSource`||
+|`segment/schemaCache/refresh/failed`|Number of dataSources whose schema 
refresh failed in the broker segment schema cache (for example, a segment 
metadata query timeout). Emitted only when a refresh fails; the failed 
dataSource is skipped for the cycle and retried later, while other dataSources 
are unaffected. Recurring emission indicates a dataSource missing from the SQL 
schema.|`dataSource`||
 |`segment/schemaCache/poll/count`|Number of coordinator polls to fetch 
datasource schema.|||
 |`segment/schemaCache/poll/failed`|Number of failed coordinator polls to fetch 
datasource schema.|||
 |`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch 
datasource schema.|||
@@ -475,6 +476,7 @@ These metrics are emitted by the Druid Coordinator in every 
run of the correspon
 |`segment/schemaCache/refreshSkipped/count`|Number of segments for which 
schema refresh was skipped due to presence of segment metadata in datasource 
polled from coordinator.|`dataSource`||
 |`segment/schemaCache/dataSource/removed`|Emitted when a datasource is removed 
from the Broker cache due to segments being marked as unused.|`dataSource`||
 |`segment/schemaCache/refresh/time`|Time taken to refresh segments in 
coordinator segment schema cache.|`dataSource`||
+|`segment/schemaCache/refresh/failed`|Number of dataSources whose schema 
refresh failed in the coordinator segment schema cache (for example, a segment 
metadata query timeout). Emitted only when a refresh fails; the failed 
dataSource is skipped for the cycle and retried later, while other dataSources 
are unaffected. Recurring emission indicates a dataSource missing from the SQL 
schema.|`dataSource`||
 |`segment/schemaCache/backfill/count`|Number of segments for which schema was 
back filled in the database.|`dataSource`||
 |`segment/schemaCache/rowSignature/changed`|Emitted when the cached row 
signature on the Broker's segment metadata cache for a datasource changes, 
indicating schema evolution or some form of flapping.|`dataSource`||
 |`segment/schemaCache/rowSignature/column/count`|Number of columns in the row 
signature on the Broker's segment metadata cache for a datasource when it's 
initialized or updated.|`dataSource`||
diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
index 3cb8dbd7600..03d5a44d6ad 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
@@ -45,6 +45,7 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
 import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
@@ -717,13 +718,43 @@ public abstract class AbstractSegmentMetadataCache<T 
extends DataSourceInformati
                 .add(segmentId);
     }
 
+    // Refresh each dataSource independently so one failure (e.g. a metadata 
query timeout) does not
+    // abort the cycle and starve the rest.
     for (Map.Entry<String, TreeSet<SegmentId>> entry : segmentMap.entrySet()) {
-      updatedSegmentIds.addAll(refreshSegmentsForDataSource(entry.getKey(), 
entry.getValue()));
+      final String dataSource = entry.getKey();
+      try {
+        updatedSegmentIds.addAll(refreshSegmentsForDataSource(dataSource, 
entry.getValue()));
+      }
+      catch (QueryInterruptedException e) {
+        // QueryInterruptedException also wraps ordinary query failures, not 
just interruption
+        // Don't emit failures for interrupted exceptions (shutdown signal, 
etc.)
+        if (e.getCause() instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+          throw e;
+        }
+        recordDataSourceRefreshFailure(dataSource, e);
+      }
+      catch (Exception e) {
+        recordDataSourceRefreshFailure(dataSource, e);
+      }
     }
 
     return updatedSegmentIds;
   }
 
+  /**
+   * Records a refresh failure for one dataSource; it is skipped this cycle 
and retried later while the rest continue.
+   */
+  private void recordDataSourceRefreshFailure(String dataSource, Exception e)
+  {
+    log.warn(e, "Failed to refresh segment schema for dataSource[%s]; skipping 
it this cycle.", dataSource);
+    emitMetric(
+        Metric.REFRESH_FAILED,
+        1,
+        new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, 
dataSource)
+    );
+  }
+
   private long recomputeIsRealtime(ImmutableSet<DruidServerMetadata> servers)
   {
     if (servers.isEmpty()) {
diff --git a/server/src/main/java/org/apache/druid/segment/metadata/Metric.java 
b/server/src/main/java/org/apache/druid/segment/metadata/Metric.java
index 96e53a3a98a..7f952b4f498 100644
--- a/server/src/main/java/org/apache/druid/segment/metadata/Metric.java
+++ b/server/src/main/java/org/apache/druid/segment/metadata/Metric.java
@@ -52,6 +52,7 @@ public class Metric
   public static final String REFRESHED_SEGMENTS = PREFIX + "refresh/count";
   public static final String REFRESH_SKIPPED_TOMBSTONES = PREFIX + 
"refresh/tombstone/count";
   public static final String REFRESH_DURATION_MILLIS = PREFIX + "refresh/time";
+  public static final String REFRESH_FAILED = PREFIX + "refresh/failed";
   public static final String DATASOURCE_REMOVED = PREFIX + 
"dataSource/removed";
   public static final String SCHEMA_ROW_SIGNATURE_CHANGED = PREFIX + 
"rowSignature/changed";
   public static final String SCHEMA_ROW_SIGNATURE_COLUMN_COUNT = PREFIX + 
"rowSignature/column/count";
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index 4574e91d203..3688b93f818 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -47,6 +48,8 @@ import 
org.apache.druid.metadata.SegmentsMetadataManagerConfig;
 import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryTimeoutException;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@@ -112,6 +115,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetadataCacheTestBase
@@ -2455,4 +2459,121 @@ public class CoordinatorSegmentMetadataCacheTest 
extends CoordinatorSegmentMetad
     Assert.assertEquals("m1", columnNames.get(2));
     Assert.assertEquals(ColumnType.LONG, 
fooRowSignature.getColumnType(columnNames.get(2)).get());
   }
+
+  /**
+   * A failure refreshing one dataSource (e.g. a SegmentMetadataQuery timeout) 
must not abort the whole
+   * refresh cycle: other dataSources are still refreshed, and a {@code 
segment/schemaCache/refresh/failed}
+   * metric is emitted for the failing dataSource.
+   */
+  @Test
+  public void testRefreshFailureForOneDatasourceIsIsolated() throws 
InterruptedException, IOException
+  {
+    final StubServiceEmitter emitter = new StubServiceEmitter("test", "test");
+    final CoordinatorSegmentMetadataCache schema = new 
CoordinatorSegmentMetadataCache(
+        getQueryLifecycleFactory(walker),
+        serverView,
+        SEGMENT_CACHE_CONFIG_DEFAULT,
+        new NoopEscalator(),
+        new InternalQueryConfig(),
+        emitter,
+        segmentSchemaCache,
+        backFillQueue,
+        segmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
+    )
+    {
+      @Override
+      public Sequence<SegmentAnalysis> 
runSegmentMetadataQuery(Iterable<SegmentId> segments)
+      {
+        // Simulate a metadata query that times out for DATASOURCE1 but 
succeeds for everything else.
+        final SegmentId first = segments.iterator().next();
+        if (DATASOURCE1.equals(first.getDataSource())) {
+          throw new QueryTimeoutException("test-induced timeout for " + 
DATASOURCE1);
+        }
+        return super.runSegmentMetadataQuery(segments);
+      }
+    };
+
+    schema.onLeaderStart();
+    schema.awaitInitialization();
+
+    final Set<SegmentId> allSegmentIds = 
schema.getSegmentMetadataSnapshot().keySet();
+    emitter.flush();
+
+    // Must not propagate the DATASOURCE1 failure.
+    final Set<SegmentId> refreshed = schema.refreshSegments(allSegmentIds);
+
+    // The healthy dataSources were still refreshed despite DATASOURCE1 
failing.
+    Assert.assertTrue(
+        "expected a refreshed segment from " + DATASOURCE2,
+        refreshed.stream().anyMatch(id -> 
DATASOURCE2.equals(id.getDataSource()))
+    );
+    Assert.assertTrue(
+        "expected a refreshed segment from " + SOME_DATASOURCE,
+        refreshed.stream().anyMatch(id -> 
SOME_DATASOURCE.equals(id.getDataSource()))
+    );
+    // The failing dataSource produced no refreshed segments.
+    Assert.assertTrue(
+        "expected no refreshed segments from the failing " + DATASOURCE1,
+        refreshed.stream().noneMatch(id -> 
DATASOURCE1.equals(id.getDataSource()))
+    );
+
+    // A failure metric was emitted, dimensioned by the failing dataSource.
+    final List<Number> failures = emitter.getMetricValues(
+        Metric.REFRESH_FAILED,
+        ImmutableMap.of(DruidMetrics.DATASOURCE, DATASOURCE1)
+    );
+    Assert.assertFalse("expected a refresh/failed metric for " + DATASOURCE1, 
failures.isEmpty());
+    Assert.assertEquals(1, failures.get(0).intValue());
+  }
+
+  @Test
+  public void 
testLocalInterruptionPropagatesButWrappedQueryFailureIsIsolated() throws 
IOException
+  {
+    final StubServiceEmitter emitter = new StubServiceEmitter("test", "test");
+    final AtomicReference<Throwable> causeRef = new AtomicReference<>();
+    final CoordinatorSegmentMetadataCache schema = new 
CoordinatorSegmentMetadataCache(
+        getQueryLifecycleFactory(walker),
+        serverView,
+        SEGMENT_CACHE_CONFIG_DEFAULT,
+        new NoopEscalator(),
+        new InternalQueryConfig(),
+        emitter,
+        segmentSchemaCache,
+        backFillQueue,
+        segmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
+    )
+    {
+      @Override
+      public Sequence<SegmentAnalysis> 
runSegmentMetadataQuery(Iterable<SegmentId> segments)
+      {
+        throw new QueryInterruptedException(causeRef.get());
+      }
+    };
+
+    final Set<SegmentId> segments = ImmutableSet.of(
+        SegmentId.of(DATASOURCE1, Intervals.of("2000/2001"), "v1", 0)
+    );
+
+    // Genuine local interruption: propagate, restore the interrupt flag, 
record no failure.
+    causeRef.set(new InterruptedException("test interrupt"));
+    Assert.assertThrows(QueryInterruptedException.class, () -> 
schema.refreshSegments(segments));
+    Assert.assertTrue("interrupt flag should be restored", 
Thread.interrupted()); // also clears it for the next case
+    Assert.assertTrue(
+        "local interruption must not emit a refresh/failed metric",
+        emitter.getMetricEvents(Metric.REFRESH_FAILED).isEmpty()
+    );
+
+    // Wrapped ordinary failure (no InterruptedException cause): isolate it - 
no propagation, failure recorded.
+    causeRef.set(new RuntimeException("wrapped query failure"));
+    schema.refreshSegments(segments); // must not throw
+    Assert.assertFalse("interrupt flag must not be set for a wrapped failure", 
Thread.currentThread().isInterrupted());
+    final List<Number> failures = emitter.getMetricValues(
+        Metric.REFRESH_FAILED,
+        ImmutableMap.of(DruidMetrics.DATASOURCE, DATASOURCE1)
+    );
+    Assert.assertFalse("a wrapped query failure should emit refresh/failed", 
failures.isEmpty());
+    Assert.assertEquals(1, failures.get(0).intValue());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to