This is an automated email from the ASF dual-hosted git repository.
jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4e6f930ed9b fix: fix broker segment metadata cache refresh (#19625)
4e6f930ed9b is described below
commit 4e6f930ed9b4b4d4997ad7d01faba4499027fd07
Author: jtuglu1 <[email protected]>
AuthorDate: Wed Jun 24 13:52:38 2026 -0700
fix: fix broker segment metadata cache refresh (#19625)
Brokers can maintain a schema cache via segment metadata queries.
Currently, if any of these queries timeout, the remaining queries are aborted
until the next refresh. If you have a huge datasource delta (think 500k+
segments being scanned), such a query can fail/timeout and cause other
unrelated datasources' broker schema discovery to fail. Without centralized
schema through coordinator, there is no intra-datasource atomicity guarantee
w.r.t schema discovery (it is just ASAP), so dec [...]
Introduces segment/schemaCache/refresh/failed metric with a dataSource
dimension, emitted when a refresh fails. Can alternatively just aggregate and
emit at the end. Also open to keeping this a warning/error log.
---
docs/operations/metrics.md | 2 +
.../metadata/AbstractSegmentMetadataCache.java | 33 +++++-
.../org/apache/druid/segment/metadata/Metric.java | 1 +
.../CoordinatorSegmentMetadataCacheTest.java | 121 +++++++++++++++++++++
4 files changed, 156 insertions(+), 1 deletion(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 8d71c6044f8..31c6813be6c 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -74,6 +74,7 @@ Most metric values reset each emission period, as specified
in `druid.monitoring
|`segment/metadataCache/sync/time`|Time taken to poll segment metadata from
the Coordinator and update the segment metadata cache. This metric is emitted
only if [metadata cache](../configuration/index.md#sql) is enabled on the
Broker.||Depends on the number of segments.|
|`segment/schemaCache/refresh/count`|Number of segments refreshed in broker
segment schema cache.|`dataSource`||
|`segment/schemaCache/refresh/time`|Time taken to refresh segments in broker
segment schema cache.|`dataSource`||
+|`segment/schemaCache/refresh/failed`|Number of dataSources whose schema
refresh failed in the broker segment schema cache (for example, a segment
metadata query timeout). Emitted only when a refresh fails; the failed
dataSource is skipped for the cycle and retried later, while other dataSources
are unaffected. Recurring emission indicates a dataSource missing from the SQL
schema.|`dataSource`||
|`segment/schemaCache/poll/count`|Number of coordinator polls to fetch
datasource schema.|||
|`segment/schemaCache/poll/failed`|Number of failed coordinator polls to fetch
datasource schema.|||
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch
datasource schema.|||
@@ -475,6 +476,7 @@ These metrics are emitted by the Druid Coordinator in every
run of the correspon
|`segment/schemaCache/refreshSkipped/count`|Number of segments for which
schema refresh was skipped due to presence of segment metadata in datasource
polled from coordinator.|`dataSource`||
|`segment/schemaCache/dataSource/removed`|Emitted when a datasource is removed
from the Broker cache due to segments being marked as unused.|`dataSource`||
|`segment/schemaCache/refresh/time`|Time taken to refresh segments in
coordinator segment schema cache.|`dataSource`||
+|`segment/schemaCache/refresh/failed`|Number of dataSources whose schema
refresh failed in the coordinator segment schema cache (for example, a segment
metadata query timeout). Emitted only when a refresh fails; the failed
dataSource is skipped for the cycle and retried later, while other dataSources
are unaffected. Recurring emission indicates a dataSource missing from the SQL
schema.|`dataSource`||
|`segment/schemaCache/backfill/count`|Number of segments for which schema was
back filled in the database.|`dataSource`||
|`segment/schemaCache/rowSignature/changed`|Emitted when the cached row
signature on the Broker's segment metadata cache for a datasource changes,
indicating schema evolution or some form of flapping.|`dataSource`||
|`segment/schemaCache/rowSignature/column/count`|Number of columns in the row
signature on the Broker's segment metadata cache for a datasource when it's
initialized or updated.|`dataSource`||
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
index 3cb8dbd7600..03d5a44d6ad 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
@@ -45,6 +45,7 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
@@ -717,13 +718,43 @@ public abstract class AbstractSegmentMetadataCache<T
extends DataSourceInformati
.add(segmentId);
}
+ // Refresh each dataSource independently so one failure (e.g. a metadata
query timeout) does not
+ // abort the cycle and starve the rest.
for (Map.Entry<String, TreeSet<SegmentId>> entry : segmentMap.entrySet()) {
- updatedSegmentIds.addAll(refreshSegmentsForDataSource(entry.getKey(),
entry.getValue()));
+ final String dataSource = entry.getKey();
+ try {
+ updatedSegmentIds.addAll(refreshSegmentsForDataSource(dataSource,
entry.getValue()));
+ }
+ catch (QueryInterruptedException e) {
+ // QueryInterruptedException also wraps ordinary query failures, not
just interruption
+ // Don't emit failures for interrupted exceptions (shutdown signal,
etc.)
+ if (e.getCause() instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ throw e;
+ }
+ recordDataSourceRefreshFailure(dataSource, e);
+ }
+ catch (Exception e) {
+ recordDataSourceRefreshFailure(dataSource, e);
+ }
}
return updatedSegmentIds;
}
+ /**
+ * Records a refresh failure for one dataSource; it is skipped this cycle
and retried later while the rest continue.
+ */
+ private void recordDataSourceRefreshFailure(String dataSource, Exception e)
+ {
+ log.warn(e, "Failed to refresh segment schema for dataSource[%s]; skipping
it this cycle.", dataSource);
+ emitMetric(
+ Metric.REFRESH_FAILED,
+ 1,
+ new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE,
dataSource)
+ );
+ }
+
private long recomputeIsRealtime(ImmutableSet<DruidServerMetadata> servers)
{
if (servers.isEmpty()) {
diff --git a/server/src/main/java/org/apache/druid/segment/metadata/Metric.java
b/server/src/main/java/org/apache/druid/segment/metadata/Metric.java
index 96e53a3a98a..7f952b4f498 100644
--- a/server/src/main/java/org/apache/druid/segment/metadata/Metric.java
+++ b/server/src/main/java/org/apache/druid/segment/metadata/Metric.java
@@ -52,6 +52,7 @@ public class Metric
public static final String REFRESHED_SEGMENTS = PREFIX + "refresh/count";
public static final String REFRESH_SKIPPED_TOMBSTONES = PREFIX +
"refresh/tombstone/count";
public static final String REFRESH_DURATION_MILLIS = PREFIX + "refresh/time";
+ public static final String REFRESH_FAILED = PREFIX + "refresh/failed";
public static final String DATASOURCE_REMOVED = PREFIX +
"dataSource/removed";
public static final String SCHEMA_ROW_SIGNATURE_CHANGED = PREFIX +
"rowSignature/changed";
public static final String SCHEMA_ROW_SIGNATURE_COLUMN_COUNT = PREFIX +
"rowSignature/column/count";
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index 4574e91d203..3688b93f818 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -47,6 +48,8 @@ import
org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@@ -112,6 +115,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetadataCacheTestBase
@@ -2455,4 +2459,121 @@ public class CoordinatorSegmentMetadataCacheTest
extends CoordinatorSegmentMetad
Assert.assertEquals("m1", columnNames.get(2));
Assert.assertEquals(ColumnType.LONG,
fooRowSignature.getColumnType(columnNames.get(2)).get());
}
+
+ /**
+ * A failure refreshing one dataSource (e.g. a SegmentMetadataQuery timeout)
must not abort the whole
+ * refresh cycle: other dataSources are still refreshed, and a {@code
segment/schemaCache/refresh/failed}
+ * metric is emitted for the failing dataSource.
+ */
+ @Test
+ public void testRefreshFailureForOneDatasourceIsIsolated() throws
InterruptedException, IOException
+ {
+ final StubServiceEmitter emitter = new StubServiceEmitter("test", "test");
+ final CoordinatorSegmentMetadataCache schema = new
CoordinatorSegmentMetadataCache(
+ getQueryLifecycleFactory(walker),
+ serverView,
+ SEGMENT_CACHE_CONFIG_DEFAULT,
+ new NoopEscalator(),
+ new InternalQueryConfig(),
+ emitter,
+ segmentSchemaCache,
+ backFillQueue,
+ segmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
+ )
+ {
+ @Override
+ public Sequence<SegmentAnalysis>
runSegmentMetadataQuery(Iterable<SegmentId> segments)
+ {
+ // Simulate a metadata query that times out for DATASOURCE1 but
succeeds for everything else.
+ final SegmentId first = segments.iterator().next();
+ if (DATASOURCE1.equals(first.getDataSource())) {
+ throw new QueryTimeoutException("test-induced timeout for " +
DATASOURCE1);
+ }
+ return super.runSegmentMetadataQuery(segments);
+ }
+ };
+
+ schema.onLeaderStart();
+ schema.awaitInitialization();
+
+ final Set<SegmentId> allSegmentIds =
schema.getSegmentMetadataSnapshot().keySet();
+ emitter.flush();
+
+ // Must not propagate the DATASOURCE1 failure.
+ final Set<SegmentId> refreshed = schema.refreshSegments(allSegmentIds);
+
+ // The healthy dataSources were still refreshed despite DATASOURCE1
failing.
+ Assert.assertTrue(
+ "expected a refreshed segment from " + DATASOURCE2,
+ refreshed.stream().anyMatch(id ->
DATASOURCE2.equals(id.getDataSource()))
+ );
+ Assert.assertTrue(
+ "expected a refreshed segment from " + SOME_DATASOURCE,
+ refreshed.stream().anyMatch(id ->
SOME_DATASOURCE.equals(id.getDataSource()))
+ );
+ // The failing dataSource produced no refreshed segments.
+ Assert.assertTrue(
+ "expected no refreshed segments from the failing " + DATASOURCE1,
+ refreshed.stream().noneMatch(id ->
DATASOURCE1.equals(id.getDataSource()))
+ );
+
+ // A failure metric was emitted, dimensioned by the failing dataSource.
+ final List<Number> failures = emitter.getMetricValues(
+ Metric.REFRESH_FAILED,
+ ImmutableMap.of(DruidMetrics.DATASOURCE, DATASOURCE1)
+ );
+ Assert.assertFalse("expected a refresh/failed metric for " + DATASOURCE1,
failures.isEmpty());
+ Assert.assertEquals(1, failures.get(0).intValue());
+ }
+
+ @Test
+ public void
testLocalInterruptionPropagatesButWrappedQueryFailureIsIsolated() throws
IOException
+ {
+ final StubServiceEmitter emitter = new StubServiceEmitter("test", "test");
+ final AtomicReference<Throwable> causeRef = new AtomicReference<>();
+ final CoordinatorSegmentMetadataCache schema = new
CoordinatorSegmentMetadataCache(
+ getQueryLifecycleFactory(walker),
+ serverView,
+ SEGMENT_CACHE_CONFIG_DEFAULT,
+ new NoopEscalator(),
+ new InternalQueryConfig(),
+ emitter,
+ segmentSchemaCache,
+ backFillQueue,
+ segmentsMetadataManager,
+ segmentsMetadataManagerConfigSupplier
+ )
+ {
+ @Override
+ public Sequence<SegmentAnalysis>
runSegmentMetadataQuery(Iterable<SegmentId> segments)
+ {
+ throw new QueryInterruptedException(causeRef.get());
+ }
+ };
+
+ final Set<SegmentId> segments = ImmutableSet.of(
+ SegmentId.of(DATASOURCE1, Intervals.of("2000/2001"), "v1", 0)
+ );
+
+ // Genuine local interruption: propagate, restore the interrupt flag,
record no failure.
+ causeRef.set(new InterruptedException("test interrupt"));
+ Assert.assertThrows(QueryInterruptedException.class, () ->
schema.refreshSegments(segments));
+ Assert.assertTrue("interrupt flag should be restored",
Thread.interrupted()); // also clears it for the next case
+ Assert.assertTrue(
+ "local interruption must not emit a refresh/failed metric",
+ emitter.getMetricEvents(Metric.REFRESH_FAILED).isEmpty()
+ );
+
+ // Wrapped ordinary failure (no InterruptedException cause): isolate it -
no propagation, failure recorded.
+ causeRef.set(new RuntimeException("wrapped query failure"));
+ schema.refreshSegments(segments); // must not throw
+ Assert.assertFalse("interrupt flag must not be set for a wrapped failure",
Thread.currentThread().isInterrupted());
+ final List<Number> failures = emitter.getMetricValues(
+ Metric.REFRESH_FAILED,
+ ImmutableMap.of(DruidMetrics.DATASOURCE, DATASOURCE1)
+ );
+ Assert.assertFalse("a wrapped query failure should emit refresh/failed",
failures.isEmpty());
+ Assert.assertEquals(1, failures.get(0).intValue());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]