This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8be787a51bb Allow use of centralized datasource schema and segment
metadata cache together (#17996)
8be787a51bb is described below
commit 8be787a51bbd695dbe83127002fba32e2ed4025f
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon May 19 22:28:56 2025 +0530
Allow use of centralized datasource schema and segment metadata cache
together (#17996)
Description
-----------
#17935 enables use of `HeapMemorySegmentMetadataCache` on the Coordinator.
But it cannot be used in conjunction with centralized datasource schema
(i.e. `SegmentSchemaCache`)
This patch supports usage of both features on the Coordinator together.
Main Changes
--------------
- Make `SegmentSchemaCache` a dependency of `HeapMemorySegmentMetadataCache`
- Bind `SegmentMetadataCache` and `SegmentSchemaCache` in
`MetadataManagerModule`
- Add `NoopSegmentSchemaCache` to be used on the Overlord
- Poll schemas in `HeapMemorySegmentMetadataCache` and update
`SegmentSchemaCache`
- Update the `used_status_last_updated` column of a segment record when its
schema fingerprint is updated
Fix a race condition
-------------------
Add a sync buffer duration of 10 seconds to `HeapMemorySegmentMetadataCache`
- Handles a race condition between sync and insert to cache (caught in
`CompactionTaskRunTest`)
- Prevents removal of entries from cache if they have a last updated time
just before sync start
and were added to the cache just after sync start
- This means that non-leader nodes will continue to consider a segment as
used if it was marked unused
within 10 seconds of any other update done it (created, marked used, schema
info added)
- 10s is more than enough for this, since the cache is already performing
as expected in several prod clusters.
Guice changes
---------------
- Add `MetadataManagerModule` used only by Coordinator and Overlord to bind
metadata managers
- Restrict `SQLMetadataStorageDruidModule` to bind only SQL connector
related stuff
---
.../MaterializedViewSupervisorTest.java | 2 -
.../materializedview/DatasourceOptimizerTest.java | 2 -
.../indexing/common/actions/TaskActionTestKit.java | 7 +-
.../indexing/common/task/IngestionTestBase.java | 8 +-
.../overlord/TaskLockBoxConcurrencyTest.java | 3 -
.../druid/indexing/overlord/TaskLockboxTest.java | 3 -
.../indexing/overlord/TaskQueueScaleTest.java | 3 -
.../SeekableStreamIndexTaskTestBase.java | 2 -
...ose.cds-coordinator-metadata-query-disabled.yml | 4 +-
...er-compose.cds-task-schema-publish-disabled.yml | 4 +-
...ocker-compose.centralized-datasource-schema.yml | 4 +-
.../apache/druid/guice/MetadataConfigModule.java | 28 ++-
.../druid/guice/SQLMetadataStorageDruidModule.java | 56 ------
.../IndexerSQLMetadataStorageCoordinator.java | 14 +-
.../SqlSegmentsMetadataManagerProvider.java | 4 +-
.../druid/metadata/SqlSegmentsMetadataQuery.java | 110 +++++++++++
...lSegmentMetadataReadOnlyTransactionFactory.java | 114 +++++++++++
.../SqlSegmentMetadataTransactionFactory.java | 62 +-----
.../segment/SqlSegmentsMetadataManagerV2.java | 15 +-
.../cache/HeapMemoryDatasourceSegmentCache.java | 8 +-
.../cache/HeapMemorySegmentMetadataCache.java | 218 +++++++++++++++++++--
.../druid/metadata/segment/cache/Metric.java | 10 +
.../segment/cache/SegmentSchemaRecord.java | 47 +++++
.../segment/metadata/NoopSegmentSchemaCache.java | 133 +++++++++++++
.../druid/segment/metadata/SegmentSchemaCache.java | 33 +++-
.../segment/metadata/SegmentSchemaManager.java | 26 ++-
...rSQLMetadataStorageCoordinatorMarkUsedTest.java | 2 -
...rSQLMetadataStorageCoordinatorReadOnlyTest.java | 29 ++-
.../IndexerSQLMetadataStorageCoordinatorTest.java | 15 +-
...ataStorageCoordinatorSchemaPersistenceTest.java | 6 +-
...dexerSqlMetadataStorageCoordinatorTestBase.java | 50 +++--
.../SqlSegmentsMetadataManagerProviderTest.java | 2 +-
.../segment/SqlSegmentsMetadataManagerV2Test.java | 63 +++---
.../cache/HeapMemorySegmentMetadataCacheTest.java | 165 +++++++++++++++-
.../server/coordinator/CreateDataSegments.java | 18 +-
.../duty/KillUnreferencedSegmentSchemaTest.java | 9 +-
.../java/org/apache/druid/cli/CliCoordinator.java | 20 +-
.../java/org/apache/druid/cli/CliOverlord.java | 9 +-
.../main/java/org/apache/druid/cli/CliPeon.java | 31 +--
.../java/org/apache/druid/cli/ServerRunnable.java | 14 +-
.../apache/druid/guice/MetadataManagerModule.java | 137 +++++++++++++
.../druid/guice/SegmentSchemaCacheModule.java | 2 -
42 files changed, 1138 insertions(+), 354 deletions(-)
diff --git
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index 62b3f4af3d7..01964ce2826 100644
---
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Sets;
import junit.framework.AssertionFailedError;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.EntryAlreadyExists;
import org.apache.druid.indexer.HadoopIOConfig;
import org.apache.druid.indexer.HadoopIngestionSpec;
@@ -113,7 +112,6 @@ public class MaterializedViewSupervisorTest
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
diff --git
a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
index 755b0efddac..1ec98359db7 100644
---
a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
+++
b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
@@ -37,7 +37,6 @@ import org.apache.druid.client.DruidServer;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.druid.curator.CuratorTestBase;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexing.materializedview.DerivativeDataSourceMetadata;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
@@ -123,7 +122,6 @@ public class DatasourceOptimizerTest extends CuratorTestBase
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
index f5a28d5d67c..3a63495928a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
@@ -40,6 +39,7 @@ import
org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
@@ -49,8 +49,6 @@ import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.rules.ExternalResource;
-import java.util.Set;
-
public class TaskActionTestKit extends ExternalResource
{
private final MetadataStorageTablesConfig metadataStorageTablesConfig =
MetadataStorageTablesConfig.fromBase("druid");
@@ -172,10 +170,12 @@ public class TaskActionTestKit extends ExternalResource
= useSegmentMetadataCache
? SegmentMetadataCache.UsageMode.ALWAYS
: SegmentMetadataCache.UsageMode.NEVER;
+
segmentMetadataCache = new HeapMemorySegmentMetadataCache(
objectMapper,
Suppliers.ofInstance(new
SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)),
Suppliers.ofInstance(metadataStorageTablesConfig),
+ new NoopSegmentSchemaCache(),
testDerbyConnector,
(poolSize, name) -> new WrappingScheduledExecutorService(name,
metadataCachePollExec, false),
NoopServiceEmitter.instance()
@@ -189,7 +189,6 @@ public class TaskActionTestKit extends ExternalResource
metadataStorageTablesConfig,
testDerbyConnector,
leaderSelector,
- Set.of(NodeRole.OVERLORD),
segmentMetadataCache,
NoopServiceEmitter.instance()
)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 9135b097578..848a2bb91ce 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -34,7 +34,6 @@ import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.RegexInputFormat;
import org.apache.druid.data.input.impl.RegexParseSpec;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
@@ -136,6 +135,7 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
private SupervisorManager supervisorManager;
private TestDataSegmentKiller dataSegmentKiller;
private SegmentMetadataCache segmentMetadataCache;
+ private SegmentSchemaCache segmentSchemaCache;
protected File reportsFile;
protected IngestionTestBase()
@@ -168,6 +168,7 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
derbyConnectorRule.getConnector()
);
+ segmentSchemaCache = new SegmentSchemaCache();
storageCoordinator = new IndexerSQLMetadataStorageCoordinator(
createTransactionFactory(),
objectMapper,
@@ -176,14 +177,13 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
segmentSchemaManager,
CentralizedDatasourceSchemaConfig.create()
);
- SegmentSchemaCache segmentSchemaCache = new SegmentSchemaCache();
segmentsMetadataManager = new SqlSegmentsMetadataManagerV2(
segmentMetadataCache,
segmentSchemaCache,
derbyConnectorRule.getConnector(),
() -> new SegmentsMetadataManagerConfig(null, null),
derbyConnectorRule.metadataTablesConfigSupplier(),
- CentralizedDatasourceSchemaConfig.create(),
+ CentralizedDatasourceSchemaConfig::create,
NoopServiceEmitter.instance(),
objectMapper
);
@@ -325,6 +325,7 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
objectMapper,
Suppliers.ofInstance(new
SegmentsMetadataManagerConfig(Period.millis(10), cacheMode)),
derbyConnectorRule.metadataTablesConfigSupplier(),
+ segmentSchemaCache,
derbyConnectorRule.getConnector(),
ScheduledExecutors::fixed,
NoopServiceEmitter.instance()
@@ -338,7 +339,6 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
leaderSelector,
- Set.of(NodeRole.OVERLORD),
segmentMetadataCache,
NoopServiceEmitter.instance()
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
index a6eb19f4783..a55117aad12 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.guava.SettableSupplier;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
@@ -50,7 +49,6 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -91,7 +89,6 @@ public class TaskLockBoxConcurrencyTest
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 8c2c78611ac..48c5b5c4e94 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -29,7 +29,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLock;
@@ -138,7 +137,6 @@ public class TaskLockboxTest
tablesConfig,
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
@@ -481,7 +479,6 @@ public class TaskLockboxTest
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
index d73e51e013a..72e0e5ad855 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@@ -70,7 +69,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -113,7 +111,6 @@ public class TaskQueueScaleTest
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 485b6699d1b..e0332708c3f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -47,7 +47,6 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
@@ -599,7 +598,6 @@ public abstract class SeekableStreamIndexTaskTestBase
extends EasyMockSupport
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
diff --git
a/integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml
b/integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml
index 60cff61472d..b435c290ef8 100644
---
a/integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml
+++
b/integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml
@@ -40,7 +40,7 @@ services:
- druid_centralizedDatasourceSchema_backFillPeriod=15000
- druid_coordinator_segmentMetadata_metadataRefreshPeriod=PT15S
- druid_coordinator_segmentMetadata_disableSegmentMetadataQueries=true
- - druid_manager_segments_useIncrementalCache=never
+ - druid_manager_segments_useIncrementalCache=always
depends_on:
- druid-overlord
- druid-metadata-storage
@@ -53,7 +53,7 @@ services:
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- druid_centralizedDatasourceSchema_enabled=true
- - druid_manager_segments_useIncrementalCache=never
+ - druid_manager_segments_useIncrementalCache=always
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
diff --git
a/integration-tests/docker/docker-compose.cds-task-schema-publish-disabled.yml
b/integration-tests/docker/docker-compose.cds-task-schema-publish-disabled.yml
index f7923f368d8..b423ad9f658 100644
---
a/integration-tests/docker/docker-compose.cds-task-schema-publish-disabled.yml
+++
b/integration-tests/docker/docker-compose.cds-task-schema-publish-disabled.yml
@@ -40,7 +40,7 @@ services:
- druid_centralizedDatasourceSchema_backFillEnabled=true
- druid_centralizedDatasourceSchema_backFillPeriod=15000
- druid_coordinator_segmentMetadata_metadataRefreshPeriod=PT15S
- - druid_manager_segments_useIncrementalCache=never
+ - druid_manager_segments_useIncrementalCache=always
depends_on:
- druid-overlord
- druid-metadata-storage
@@ -54,7 +54,7 @@ services:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- druid_centralizedDatasourceSchema_enabled=true
- druid_centralizedDatasourceSchema_taskSchemaPublishDisabled=true
- - druid_manager_segments_useIncrementalCache=never
+ - druid_manager_segments_useIncrementalCache=always
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
diff --git
a/integration-tests/docker/docker-compose.centralized-datasource-schema.yml
b/integration-tests/docker/docker-compose.centralized-datasource-schema.yml
index 8674a562012..b47ce1533af 100644
--- a/integration-tests/docker/docker-compose.centralized-datasource-schema.yml
+++ b/integration-tests/docker/docker-compose.centralized-datasource-schema.yml
@@ -39,7 +39,7 @@ services:
- druid_centralizedDatasourceSchema_backFillEnabled=true
- druid_centralizedDatasourceSchema_backFillPeriod=15000
- druid_coordinator_segmentMetadata_metadataRefreshPeriod=PT15S
- - druid_manager_segments_useIncrementalCache=never
+ - druid_manager_segments_useIncrementalCache=always
depends_on:
- druid-overlord
- druid-metadata-storage
@@ -52,7 +52,7 @@ services:
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- druid_centralizedDatasourceSchema_enabled=true
- - druid_manager_segments_useIncrementalCache=never
+ - druid_manager_segments_useIncrementalCache=always
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
diff --git
a/server/src/main/java/org/apache/druid/guice/MetadataConfigModule.java
b/server/src/main/java/org/apache/druid/guice/MetadataConfigModule.java
index 1ca4578e560..7a429049db8 100644
--- a/server/src/main/java/org/apache/druid/guice/MetadataConfigModule.java
+++ b/server/src/main/java/org/apache/druid/guice/MetadataConfigModule.java
@@ -21,29 +21,43 @@ package org.apache.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
-import org.apache.druid.metadata.MetadataRuleManagerConfig;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
-import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import java.util.Properties;
+
+/**
+ * Binds the following metadata configs for all services:
+ * <ul>
+ * <li>{@link MetadataStorageTablesConfig}</li>
+ * <li>{@link MetadataStorageConnectorConfig}</li>
+ * <li>{@link CentralizedDatasourceSchemaConfig}</li>
+ * </ul>
+ * Ideally, the storage configs should be bound only on Coordinator and
Overlord,
+ * but they are needed for other services too since metadata storage extensions
+ * are currently loaded on all services.
+ */
public class MetadataConfigModule implements Module
{
+ public static final String CENTRALIZED_DATASOURCE_SCHEMA_ENABLED =
+ CentralizedDatasourceSchemaConfig.PROPERTY_PREFIX + ".enabled";
+
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, MetadataStorageTablesConfig.PROPERTY_BASE,
MetadataStorageTablesConfig.class);
JsonConfigProvider.bind(binder,
MetadataStorageConnectorConfig.PROPERTY_BASE,
MetadataStorageConnectorConfig.class);
- JsonConfigProvider.bind(binder, "druid.manager.segments",
SegmentsMetadataManagerConfig.class);
- JsonConfigProvider.bind(binder, "druid.manager.rules",
MetadataRuleManagerConfig.class);
-
- // SegmentSchemaCacheConfig needs to be bound on all services since
- // it is a dependency of SqlSegmentsMetadataManager (both legacy and V2)
JsonConfigProvider.bind(
binder,
CentralizedDatasourceSchemaConfig.PROPERTY_PREFIX,
CentralizedDatasourceSchemaConfig.class
);
}
+
+ public static boolean isSegmentSchemaCacheEnabled(Properties properties)
+ {
+ return
Boolean.parseBoolean(properties.getProperty(CENTRALIZED_DATASOURCE_SCHEMA_ENABLED));
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/guice/SQLMetadataStorageDruidModule.java
b/server/src/main/java/org/apache/druid/guice/SQLMetadataStorageDruidModule.java
index 5f8197d31c3..8ef0f36a07f 100644
---
a/server/src/main/java/org/apache/druid/guice/SQLMetadataStorageDruidModule.java
+++
b/server/src/main/java/org/apache/druid/guice/SQLMetadataStorageDruidModule.java
@@ -23,25 +23,10 @@ import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import org.apache.druid.audit.AuditManager;
-import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
-import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
-import org.apache.druid.metadata.MetadataRuleManager;
-import org.apache.druid.metadata.MetadataRuleManagerProvider;
import org.apache.druid.metadata.MetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageProvider;
-import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SQLMetadataConnector;
-import org.apache.druid.metadata.SQLMetadataRuleManager;
-import org.apache.druid.metadata.SQLMetadataRuleManagerProvider;
-import org.apache.druid.metadata.SQLMetadataSupervisorManager;
-import org.apache.druid.metadata.SegmentsMetadataManager;
-import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
-import org.apache.druid.metadata.SqlSegmentsMetadataManagerProvider;
-import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory;
-import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
-import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
-import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.server.audit.AuditManagerConfig;
import org.apache.druid.server.audit.AuditSerdeHelper;
import org.apache.druid.server.audit.SQLAuditManager;
@@ -69,14 +54,7 @@ public class SQLMetadataStorageDruidModule implements Module
PolyBind.createChoiceWithDefault(binder, prop,
Key.get(MetadataStorageProvider.class), defaultValue);
PolyBind.createChoiceWithDefault(binder, prop,
Key.get(SQLMetadataConnector.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop,
Key.get(SegmentsMetadataManager.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop,
Key.get(SegmentsMetadataManagerProvider.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop,
Key.get(MetadataRuleManager.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop,
Key.get(MetadataRuleManagerProvider.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop,
Key.get(SegmentMetadataTransactionFactory.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop,
Key.get(IndexerMetadataStorageCoordinator.class), defaultValue);
PolyBind.createChoiceWithDefault(binder, prop,
Key.get(MetadataStorageActionHandlerFactory.class), defaultValue);
- PolyBind.createChoiceWithDefault(binder, prop,
Key.get(MetadataSupervisorManager.class), defaultValue);
configureAuditManager(binder);
}
@@ -84,41 +62,7 @@ public class SQLMetadataStorageDruidModule implements Module
@Override
public void configure(Binder binder)
{
- PolyBind.optionBinder(binder,
Key.get(SegmentsMetadataManagerProvider.class))
- .addBinding(type)
- .to(SqlSegmentsMetadataManagerProvider.class)
- .in(LazySingleton.class);
- PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
- .addBinding(type)
- .to(SQLMetadataRuleManager.class)
- .in(LazySingleton.class);
-
- PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
- .addBinding(type)
- .to(SQLMetadataRuleManagerProvider.class)
- .in(LazySingleton.class);
-
- // SegmentMetadataCache is bound for all services but is used only by the
Overlord and Coordinator
- // similar to some other classes bound here, such as
IndexerSQLMetadataStorageCoordinator
- binder.bind(SegmentMetadataCache.class)
- .to(HeapMemorySegmentMetadataCache.class)
- .in(LazySingleton.class);
-
- PolyBind.optionBinder(binder,
Key.get(SegmentMetadataTransactionFactory.class))
- .addBinding(type)
- .to(SqlSegmentMetadataTransactionFactory.class)
- .in(LazySingleton.class);
-
- PolyBind.optionBinder(binder,
Key.get(IndexerMetadataStorageCoordinator.class))
- .addBinding(type)
- .to(IndexerSQLMetadataStorageCoordinator.class)
- .in(ManageLifecycle.class);
-
- PolyBind.optionBinder(binder, Key.get(MetadataSupervisorManager.class))
- .addBinding(type)
- .to(SQLMetadataSupervisorManager.class)
- .in(LazySingleton.class);
}
private void configureAuditManager(Binder binder)
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 6ef71cb83de..8ede92dc9c5 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -1631,7 +1631,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
private void persistSchema(
final SegmentMetadataTransaction transaction,
final Set<DataSegment> segments,
- final SegmentSchemaMapping segmentSchemaMapping
+ final SegmentSchemaMapping segmentSchemaMapping,
+ final DateTime updateTime
) throws JsonProcessingException
{
if (segmentSchemaMapping.getSchemaVersion() !=
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) {
@@ -1650,7 +1651,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
transaction.getHandle(),
dataSource,
segmentSchemaMapping.getSchemaVersion(),
- segmentSchemaMapping.getSchemaFingerprintToPayloadMap()
+ segmentSchemaMapping.getSchemaFingerprintToPayloadMap(),
+ updateTime
);
}
@@ -1662,10 +1664,11 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
{
final Set<DataSegment> toInsertSegments = new HashSet<>();
try {
+ final DateTime createdTime = DateTimes.nowUtc();
boolean shouldPersistSchema = shouldPersistSchema(segmentSchemaMapping);
if (shouldPersistSchema) {
- persistSchema(transaction, segments, segmentSchemaMapping);
+ persistSchema(transaction, segments, segmentSchemaMapping,
createdTime);
}
final Set<SegmentId> segmentIds =
segments.stream().map(DataSegment::getId).collect(Collectors.toSet());
@@ -1678,7 +1681,6 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
}
- final DateTime createdTime = DateTimes.nowUtc();
final Set<DataSegment> usedSegments =
findNonOvershadowedSegments(segments);
final Set<DataSegmentPlus> segmentPlusToInsert =
toInsertSegments.stream().map(segment -> {
@@ -1881,8 +1883,9 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
Map<String, String> upgradedFromSegmentIdMap
) throws Exception
{
+ final DateTime createdTime = DateTimes.nowUtc();
if (shouldPersistSchema(segmentSchemaMapping)) {
- persistSchema(transaction, segments, segmentSchemaMapping);
+ persistSchema(transaction, segments, segmentSchemaMapping, createdTime);
}
// Do not insert segment IDs which already exist
@@ -1892,7 +1895,6 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
s -> !existingSegmentIds.contains(s.getId().toString())
).collect(Collectors.toSet());
- final DateTime createdTime = DateTimes.nowUtc();
final Set<DataSegmentPlus> segmentPlusToInsert =
segmentsToInsert.stream().map(segment -> {
SegmentMetadata segmentMetadata =
getSegmentMetadataFromSchemaMappingOrUpgradeMetadata(
segment.getId(),
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProvider.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProvider.java
index f69dc950808..8491c524226 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProvider.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProvider.java
@@ -39,7 +39,7 @@ public class SqlSegmentsMetadataManagerProvider implements
SegmentsMetadataManag
private final ServiceEmitter serviceEmitter;
private final SegmentSchemaCache segmentSchemaCache;
private final SegmentMetadataCache segmentMetadataCache;
- private final CentralizedDatasourceSchemaConfig
centralizedDatasourceSchemaConfig;
+ private final Supplier<CentralizedDatasourceSchemaConfig>
centralizedDatasourceSchemaConfig;
@Inject
public SqlSegmentsMetadataManagerProvider(
@@ -50,7 +50,7 @@ public class SqlSegmentsMetadataManagerProvider implements
SegmentsMetadataManag
SQLMetadataConnector connector,
Lifecycle lifecycle,
SegmentSchemaCache segmentSchemaCache,
- CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+ Supplier<CentralizedDatasourceSchemaConfig>
centralizedDatasourceSchemaConfig,
ServiceEmitter serviceEmitter
)
{
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index b1d7d16a145..de2e35813c4 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -39,6 +39,9 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.segment.cache.SegmentSchemaRecord;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
@@ -607,6 +610,91 @@ public class SqlSegmentsMetadataQuery
return CloseableIterators.wrap(resultIterator, resultIterator);
}
+ /**
+ * Retrieves all used schema fingerprints present in the metadata store.
+ */
+ public Set<String> retrieveAllUsedSegmentSchemaFingerprints()
+ {
+ final String sql = StringUtils.format(
+ "SELECT fingerprint FROM %s WHERE version = %s AND used = true",
+ dbTables.getSegmentSchemasTable(),
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+ );
+ return Set.copyOf(
+ handle.createQuery(sql)
+ .setFetchSize(connector.getStreamingFetchSize())
+ .mapTo(String.class)
+ .list()
+ );
+ }
+
+ /**
+ * Retrieves all used segment schemas present in the metadata store
irrespective
+ * of their last updated time.
+ */
+ public List<SegmentSchemaRecord> retrieveAllUsedSegmentSchemas()
+ {
+ final String sql = StringUtils.format(
+ "SELECT fingerprint, payload FROM %s"
+ + " WHERE version = %s AND used = true",
+ dbTables.getSegmentSchemasTable(),
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+ );
+ return retrieveValidSchemaRecordsWithQuery(handle.createQuery(sql));
+ }
+
+ /**
+ * Retrieves segment schemas from the metadata store for the given
fingerprints.
+ */
+ public List<SegmentSchemaRecord> retrieveUsedSegmentSchemasForFingerprints(
+ Set<String> schemaFingerprints
+ )
+ {
+ final List<List<String>> fingerprintBatches = Lists.partition(
+ List.copyOf(schemaFingerprints),
+ MAX_INTERVALS_PER_BATCH
+ );
+
+ final List<SegmentSchemaRecord> records = new ArrayList<>();
+ for (List<String> fingerprintBatch : fingerprintBatches) {
+ records.addAll(
+ retrieveBatchOfSegmentSchemas(fingerprintBatch)
+ );
+ }
+
+ return records;
+ }
+
+ /**
+ * Retrieves a batch of segment schema records for the given fingerprints.
+ */
+ private List<SegmentSchemaRecord> retrieveBatchOfSegmentSchemas(List<String>
schemaFingerprints)
+ {
+ final String sql = StringUtils.format(
+ "SELECT fingerprint, payload FROM %s"
+ + " WHERE version = %s AND used = true"
+ + " %s",
+ dbTables.getSegmentSchemasTable(),
+ CentralizedDatasourceSchemaConfig.SCHEMA_VERSION,
+ getParameterizedInConditionForColumn("fingerprint", schemaFingerprints)
+ );
+
+ final Query<Map<String, Object>> query = handle.createQuery(sql);
+ bindColumnValuesToQueryWithInCondition("fingerprint", schemaFingerprints,
query);
+
+ return retrieveValidSchemaRecordsWithQuery(query);
+ }
+
+ private List<SegmentSchemaRecord> retrieveValidSchemaRecordsWithQuery(
+ Query<Map<String, Object>> query
+ )
+ {
+ return query.setFetchSize(connector.getStreamingFetchSize())
+ .map((index, r, ctx) -> mapToSchemaRecord(r))
+ .list()
+ .stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
/**
* Marks the given segment IDs as used.
*
@@ -1467,6 +1555,28 @@ public class SqlSegmentsMetadataQuery
}
}
+ /**
+ * Tries to parse the fields of the result set into a {@link
SegmentSchemaRecord}.
+ *
+ * @return null if an error occurred while parsing the result
+ */
+ @Nullable
+ private SegmentSchemaRecord mapToSchemaRecord(ResultSet resultSet)
+ {
+ String fingerprint = null;
+ try {
+ fingerprint = resultSet.getString("fingerprint");
+ return new SegmentSchemaRecord(
+ fingerprint,
+ jsonMapper.readValue(resultSet.getBytes("payload"),
SchemaPayload.class)
+ );
+ }
+ catch (Throwable t) {
+ log.error(t, "Could not read segment schema with fingerprint[%s]",
fingerprint);
+ return null;
+ }
+ }
+
private ResultIterator<DataSegment>
getDataSegmentResultIterator(Query<Map<String, Object>> sql)
{
return sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper,
r.getBytes(2), DataSegment.class))
diff --git
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataReadOnlyTransactionFactory.java
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataReadOnlyTransactionFactory.java
new file mode 100644
index 00000000000..0d0d1e42a1e
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataReadOnlyTransactionFactory.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata.segment;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.TransactionStatus;
+
+/**
+ * Factory for read-only {@link SegmentMetadataTransaction}s that always read
+ * directly from the metadata store and never from the {@code
SegmentMetadataCache}.
+ *
+ * @see SqlSegmentMetadataTransactionFactory
+ */
+public class SqlSegmentMetadataReadOnlyTransactionFactory implements
SegmentMetadataTransactionFactory
+{
+ private static final int QUIET_RETRIES = 2;
+ private static final int MAX_RETRIES = 3;
+
+ private final ObjectMapper jsonMapper;
+ private final MetadataStorageTablesConfig tablesConfig;
+ private final SQLMetadataConnector connector;
+
+ @Inject
+ public SqlSegmentMetadataReadOnlyTransactionFactory(
+ ObjectMapper jsonMapper,
+ MetadataStorageTablesConfig tablesConfig,
+ SQLMetadataConnector connector
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ this.tablesConfig = tablesConfig;
+ this.connector = connector;
+ }
+
+ public int getMaxRetries()
+ {
+ return MAX_RETRIES;
+ }
+
+ public int getQuietRetries()
+ {
+ return QUIET_RETRIES;
+ }
+
+ @Override
+ public <T> T inReadOnlyDatasourceTransaction(
+ String dataSource,
+ SegmentMetadataReadTransaction.Callback<T> callback
+ )
+ {
+ return connector.retryReadOnlyTransaction(
+ (handle, status) -> {
+ final SegmentMetadataTransaction sqlTransaction
+ = createSqlTransaction(dataSource, handle, status);
+ return executeReadAndClose(sqlTransaction, callback);
+ },
+ QUIET_RETRIES,
+ getMaxRetries()
+ );
+ }
+
+ @Override
+ public <T> T inReadWriteDatasourceTransaction(
+ String dataSource,
+ SegmentMetadataTransaction.Callback<T> callback
+ )
+ {
+ throw DruidException.defensive("Only Overlord can perform write
transactions on segment metadata.");
+ }
+
+ protected SegmentMetadataTransaction createSqlTransaction(
+ String dataSource,
+ Handle handle,
+ TransactionStatus transactionStatus
+ )
+ {
+ return new SqlSegmentMetadataTransaction(
+ dataSource,
+ handle, transactionStatus, connector, tablesConfig, jsonMapper
+ );
+ }
+
+ protected <T> T executeReadAndClose(
+ SegmentMetadataReadTransaction transaction,
+ SegmentMetadataReadTransaction.Callback<T> callback
+ ) throws Exception
+ {
+ try (transaction) {
+ return callback.inTransaction(transaction);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java
index 2942ed1a086..26083e44b67 100644
---
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java
+++
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java
@@ -23,9 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DruidLeaderSelector;
-import org.apache.druid.discovery.NodeRole;
-import org.apache.druid.error.DruidException;
-import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
@@ -34,10 +31,6 @@ import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.segment.cache.Metric;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.query.DruidMetrics;
-import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.TransactionStatus;
-
-import java.util.Set;
/**
* Factory for {@link SegmentMetadataTransaction}s. If the
@@ -56,46 +49,30 @@ import java.util.Set;
* now, it continues to read directly from the metadata store for consistency
* with older Druid versions.
*/
-public class SqlSegmentMetadataTransactionFactory implements
SegmentMetadataTransactionFactory
+public class SqlSegmentMetadataTransactionFactory extends
SqlSegmentMetadataReadOnlyTransactionFactory
{
private static final Logger log = new
Logger(SqlSegmentMetadataTransactionFactory.class);
- private static final int QUIET_RETRIES = 2;
- private static final int MAX_RETRIES = 3;
-
- private final ObjectMapper jsonMapper;
- private final MetadataStorageTablesConfig tablesConfig;
private final SQLMetadataConnector connector;
private final DruidLeaderSelector leaderSelector;
private final SegmentMetadataCache segmentMetadataCache;
private final ServiceEmitter emitter;
- private final boolean isNotOverlord;
-
@Inject
public SqlSegmentMetadataTransactionFactory(
ObjectMapper jsonMapper,
MetadataStorageTablesConfig tablesConfig,
SQLMetadataConnector connector,
@IndexingService DruidLeaderSelector leaderSelector,
- @Self Set<NodeRole> nodeRoles,
SegmentMetadataCache segmentMetadataCache,
ServiceEmitter emitter
)
{
- this.jsonMapper = jsonMapper;
- this.tablesConfig = tablesConfig;
+ super(jsonMapper, tablesConfig, connector);
this.connector = connector;
this.leaderSelector = leaderSelector;
this.segmentMetadataCache = segmentMetadataCache;
this.emitter = emitter;
-
- this.isNotOverlord = !nodeRoles.contains(NodeRole.OVERLORD);
- }
-
- public int getMaxRetries()
- {
- return MAX_RETRIES;
}
@Override
@@ -109,10 +86,7 @@ public class SqlSegmentMetadataTransactionFactory
implements SegmentMetadataTran
final SegmentMetadataTransaction sqlTransaction
= createSqlTransaction(dataSource, handle, status);
- if (isNotOverlord) {
- // Read directly from the metadata store if not Overlord
- return executeReadAndClose(sqlTransaction, callback);
- } else if (segmentMetadataCache.isSyncedForRead()) {
+ if (segmentMetadataCache.isSyncedForRead()) {
// Use cache as it is already synced with the metadata store
emitTransactionCount(Metric.READ_ONLY_TRANSACTIONS, dataSource);
return segmentMetadataCache.readCacheForDataSource(dataSource,
dataSourceCache -> {
@@ -124,7 +98,7 @@ public class SqlSegmentMetadataTransactionFactory implements
SegmentMetadataTran
return executeReadAndClose(sqlTransaction, callback);
}
},
- QUIET_RETRIES,
+ getQuietRetries(),
getMaxRetries()
);
}
@@ -135,10 +109,6 @@ public class SqlSegmentMetadataTransactionFactory
implements SegmentMetadataTran
SegmentMetadataTransaction.Callback<T> callback
)
{
- if (isNotOverlord) {
- throw DruidException.defensive("Only Overlord can perform write
transactions on segment metadata.");
- }
-
return connector.retryTransaction(
(handle, status) -> {
final SegmentMetadataTransaction sqlTransaction
@@ -167,23 +137,11 @@ public class SqlSegmentMetadataTransactionFactory
implements SegmentMetadataTran
return executeWriteAndClose(sqlTransaction, callback);
}
},
- QUIET_RETRIES,
+ getQuietRetries(),
getMaxRetries()
);
}
- private SegmentMetadataTransaction createSqlTransaction(
- String dataSource,
- Handle handle,
- TransactionStatus transactionStatus
- )
- {
- return new SqlSegmentMetadataTransaction(
- dataSource,
- handle, transactionStatus, connector, tablesConfig, jsonMapper
- );
- }
-
private <T> T executeWriteAndClose(
SegmentMetadataTransaction transaction,
SegmentMetadataTransaction.Callback<T> callback
@@ -201,16 +159,6 @@ public class SqlSegmentMetadataTransactionFactory
implements SegmentMetadataTran
}
}
- private <T> T executeReadAndClose(
- SegmentMetadataReadTransaction transaction,
- SegmentMetadataReadTransaction.Callback<T> callback
- ) throws Exception
- {
- try (transaction) {
- return callback.inTransaction(transaction);
- }
- }
-
private void emitTransactionCount(String metricName, String datasource)
{
emitter.emit(
diff --git
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2.java
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2.java
index 893950c8362..9d044962dd7 100644
---
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2.java
+++
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2.java
@@ -63,7 +63,6 @@ public class SqlSegmentsMetadataManagerV2 implements
SegmentsMetadataManager
private final SegmentsMetadataManager delegate;
private final SegmentMetadataCache segmentMetadataCache;
private final SegmentsMetadataManagerConfig managerConfig;
- private final CentralizedDatasourceSchemaConfig schemaConfig;
public SqlSegmentsMetadataManagerV2(
SegmentMetadataCache segmentMetadataCache,
@@ -71,7 +70,7 @@ public class SqlSegmentsMetadataManagerV2 implements
SegmentsMetadataManager
SQLMetadataConnector connector,
Supplier<SegmentsMetadataManagerConfig> managerConfig,
Supplier<MetadataStorageTablesConfig> tablesConfig,
- CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+ Supplier<CentralizedDatasourceSchemaConfig>
centralizedDatasourceSchemaConfig,
ServiceEmitter serviceEmitter,
ObjectMapper jsonMapper
)
@@ -79,20 +78,10 @@ public class SqlSegmentsMetadataManagerV2 implements
SegmentsMetadataManager
this.delegate = new SqlSegmentsMetadataManager(
jsonMapper,
managerConfig, tablesConfig, connector, segmentSchemaCache,
- centralizedDatasourceSchemaConfig, serviceEmitter
+ centralizedDatasourceSchemaConfig.get(), serviceEmitter
);
this.managerConfig = managerConfig.get();
this.segmentMetadataCache = segmentMetadataCache;
- this.schemaConfig = centralizedDatasourceSchemaConfig;
-
- // Segment metadata cache currently cannot handle schema updates
- if (segmentMetadataCache.isEnabled() && schemaConfig.isEnabled()) {
- throw new IllegalArgumentException(
- "Segment metadata incremental
cache['druid.manager.segments.useIncrementalCache']"
- + " and segment schema
cache['druid.centralizedDatasourceSchema.enabled']"
- + " must not be enabled together."
- );
- }
}
/**
diff --git
a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemoryDatasourceSegmentCache.java
b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemoryDatasourceSegmentCache.java
index 8b9258a32a5..3a091a6c0b4 100644
---
a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemoryDatasourceSegmentCache.java
+++
b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemoryDatasourceSegmentCache.java
@@ -224,7 +224,7 @@ class HeapMemoryDatasourceSegmentCache extends
ReadWriteCache implements AutoClo
*
* @param persistedSegmentIds Segment IDs present in the metadata store
* @param syncStartTime Start time of the current sync
- * @return Number of unpersisted segments removed from cache.
+ * @return Number of unpersisted segment IDs removed from the cache.
*/
private int removeUnpersistedSegments(Set<SegmentId> persistedSegmentIds,
DateTime syncStartTime)
{
@@ -725,12 +725,6 @@ class HeapMemoryDatasourceSegmentCache extends
ReadWriteCache implements AutoClo
&& unusedSegmentIdToUpdatedTime.isEmpty();
}
- private boolean isSegmentIdCached(SegmentId id)
- {
- return idToUsedSegment.containsKey(id)
- || unusedSegmentIdToUpdatedTime.containsKey(id);
- }
-
/**
* Removes the given segment from the cache.
*
diff --git
a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java
b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java
index e954a4f8b4a..2a1cf50133e 100644
---
a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java
+++
b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java
@@ -22,6 +22,7 @@ package org.apache.druid.metadata.segment.cache;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -48,6 +49,9 @@ import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataQuery;
import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SegmentMetadata;
+import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@@ -56,6 +60,7 @@ import org.joda.time.Duration;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.TransactionCallback;
+import javax.annotation.Nullable;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
@@ -70,6 +75,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* In-memory implementation of {@link SegmentMetadataCache}.
@@ -100,6 +107,22 @@ public class HeapMemorySegmentMetadataCache implements
SegmentMetadataCache
private static final int MIN_SYNC_DELAY_MILLIS = 1000;
private static final int MAX_IMMEDIATE_SYNC_RETRIES = 3;
+ /**
+ * Buffer duration for which entries are kept in the cache even if the
+ * metadata store does not have them. In other words, a segment entry is
+ * removed from cache if the entry is not present in metadata store and has a
+ * {@code lastUpdatedTime < now - bufferWindow}.
+ * <p>
+ * This is primarily needed to handle a race condition between insert and
sync
+ * where an entry with an updated time just before the sync start is added to
+ * the cache just after the sync has started.
+ * <p>
+ * This means that non-leader Overlord and all Coordinators will continue to
+ * consider a segment as used if it was marked as unused within the buffer
period
+ * of a previous update (e.g. segment created, marked used or schema info
updated).
+ */
+ private static final Duration SYNC_BUFFER_DURATION =
Duration.standardSeconds(10);
+
private enum CacheState
{
STOPPED, FOLLOWER, LEADER_FIRST_SYNC_PENDING, LEADER_FIRST_SYNC_STARTED,
LEADER_READY
@@ -111,6 +134,9 @@ public class HeapMemorySegmentMetadataCache implements
SegmentMetadataCache
private final MetadataStorageTablesConfig tablesConfig;
private final SQLMetadataConnector connector;
+ private final boolean useSchemaCache;
+ private final SegmentSchemaCache segmentSchemaCache;
+
private final ListeningScheduledExecutorService pollExecutor;
private final ServiceEmitter emitter;
@@ -141,6 +167,7 @@ public class HeapMemorySegmentMetadataCache implements
SegmentMetadataCache
ObjectMapper jsonMapper,
Supplier<SegmentsMetadataManagerConfig> config,
Supplier<MetadataStorageTablesConfig> tablesConfig,
+ SegmentSchemaCache segmentSchemaCache,
SQLMetadataConnector connector,
ScheduledExecutorFactory executorFactory,
ServiceEmitter emitter
@@ -150,6 +177,8 @@ public class HeapMemorySegmentMetadataCache implements
SegmentMetadataCache
this.cacheMode = config.get().getCacheUsageMode();
this.pollDuration = config.get().getPollDuration().toStandardDuration();
this.tablesConfig = tablesConfig.get();
+ this.useSchemaCache = segmentSchemaCache.isEnabled();
+ this.segmentSchemaCache = segmentSchemaCache;
this.connector = connector;
this.pollExecutor = isEnabled()
?
MoreExecutors.listeningDecorator(executorFactory.create(1,
"SegmentMetadataCache-%s"))
@@ -499,15 +528,18 @@ public class HeapMemorySegmentMetadataCache implements
SegmentMetadataCache
* <p>
* The following actions are performed in every sync:
* <ul>
- * <li>Retrieve all used and unused segment IDs along with their updated
timestamps</li>
+ * <li>Retrieve all used segment IDs along with their updated
timestamps.</li>
+ * <li>Sync segment IDs in the cache with the retrieved segment IDs.</li>
* <li>Retrieve payloads of used segments which have been updated in the
metadata
- * store but not in the cache</li>
- * <li>Retrieve all pending segments and update the cache as needed</li>
- * <li>Remove segments not present in the metadata store</li>
- * <li>Reset the max unused partition IDs</li>
- * <li>Change the cache state to ready if it is leader and waiting for first
sync</li>
+ * store but not in the cache.</li>
+ * <li>Retrieve all pending segments and update the cache as needed.</li>
+ * <li>If schema caching is enabled, retrieve segment schemas and reset them
+ * in the {@link SegmentSchemaCache}.</li>
+ * <li>Clean up entries for datasources which have no segments in the cache
anymore.</li>
+ * <li>Change the cache state to ready if it is leader and waiting for first
sync.</li>
* <li>Emit metrics</li>
* </ul>
+ * </p>
*
* @return Time taken in milliseconds for the sync to finish
*/
@@ -532,13 +564,18 @@ public class HeapMemorySegmentMetadataCache implements
SegmentMetadataCache
retrieveAllUsedSegments(datasourceToSummary);
} else {
retrieveUsedSegmentIds(datasourceToSummary);
- updateSegmentIdsInCache(datasourceToSummary, syncStartTime);
+ updateSegmentIdsInCache(datasourceToSummary,
syncStartTime.minus(SYNC_BUFFER_DURATION));
retrieveUsedSegmentPayloads(datasourceToSummary);
}
updateUsedSegmentPayloadsInCache(datasourceToSummary);
retrieveAllPendingSegments(datasourceToSummary);
- updatePendingSegmentsInCache(datasourceToSummary, syncStartTime);
+ updatePendingSegmentsInCache(datasourceToSummary,
syncStartTime.minus(SYNC_BUFFER_DURATION));
+
+ if (useSchemaCache) {
+ retrieveAndResetUsedSegmentSchemas(datasourceToSummary);
+ }
+
markCacheSynced(syncStartTime);
syncFinishTime.set(DateTimes.nowUtc());
@@ -642,6 +679,16 @@ public class HeapMemorySegmentMetadataCache implements
SegmentMetadataCache
emitMetric(Metric.RETRIEVE_SEGMENT_IDS_DURATION_MILLIS,
retrieveDuration.millisElapsed());
}
+ private <T> T query(Function<SqlSegmentsMetadataQuery, T> sqlFunction)
+ {
+ return inReadOnlyTransaction(
+ (handle, status) -> sqlFunction.apply(
+ SqlSegmentsMetadataQuery
+ .forHandle(handle, connector, tablesConfig, jsonMapper)
+ )
+ );
+ }
+
private <T> T inReadOnlyTransaction(TransactionCallback<T> callback)
{
return connector.retryReadOnlyTransaction(callback, SQL_QUIET_RETRIES,
SQL_MAX_RETRIES);
@@ -668,7 +715,7 @@ public class HeapMemorySegmentMetadataCache implements
SegmentMetadataCache
CloseableIterator<DataSegmentPlus> iterator =
SqlSegmentsMetadataQuery
.forHandle(handle, connector, tablesConfig, jsonMapper)
- .retrieveSegmentsByIdIterator(dataSource,
segmentIdsToRefresh, false)
+ .retrieveSegmentsByIdIterator(dataSource,
segmentIdsToRefresh, useSchemaCache)
) {
iterator.forEachRemaining(summary.usedSegments::add);
return 0;
@@ -679,7 +726,8 @@ public class HeapMemorySegmentMetadataCache implements
SegmentMetadataCache
/**
* Updates the cache of each datasource with the segment IDs fetched from the
* metadata store in {@link #retrieveUsedSegmentIds}. The update done on each
- * datasource cache is atomic.
+ * datasource cache is atomic. Also identifies the segment IDs which have
been
+ * updated in the metadata store and need to be refreshed in the cache.
*/
private void updateSegmentIdsInCache(
Map<String, DatasourceSegmentSummary> datasourceToSummary,
@@ -734,10 +782,20 @@ public class HeapMemorySegmentMetadataCache implements
SegmentMetadataCache
)
{
final Stopwatch retrieveDuration = Stopwatch.createStarted();
- final String sql = StringUtils.format(
- "SELECT id, payload, created_date, used_status_last_updated FROM %s
WHERE used = true",
- tablesConfig.getSegmentsTable()
- );
+ final String sql;
+ if (useSchemaCache) {
+ sql = StringUtils.format(
+ "SELECT id, payload, created_date, used_status_last_updated,
schema_fingerprint, num_rows"
+ + " FROM %s WHERE used = true",
+ tablesConfig.getSegmentsTable()
+ );
+ } else {
+ sql = StringUtils.format(
+ "SELECT id, payload, created_date, used_status_last_updated"
+ + " FROM %s WHERE used = true",
+ tablesConfig.getSegmentsTable()
+ );
+ }
final int numSkippedSegments = inReadOnlyTransaction((handle, status) -> {
try (
@@ -870,11 +928,139 @@ public class HeapMemorySegmentMetadataCache implements
SegmentMetadataCache
}
}
+ /**
+ * Retrieves required used segment schemas from the metadata store and resets
+ * them in the {@link SegmentSchemaCache}. If this is the first sync, all
used
+ * schemas are retrieved from the metadata store. If this is a delta sync,
+ * first only the fingerprints of all used schemas are retrieved. Payloads
are
+ * then fetched for only the fingerprints which are not present in the cache.
+ */
+ private void retrieveAndResetUsedSegmentSchemas(
+ Map<String, DatasourceSegmentSummary> datasourceToSummary
+ )
+ {
+ final Stopwatch schemaSyncDuration = Stopwatch.createStarted();
+
+ // Reset the SegmentSchemaCache with latest schemas and metadata
+ final Map<String, SchemaPayload> schemaFingerprintToPayload;
+ if (syncFinishTime.get() == null) {
+ schemaFingerprintToPayload =
buildSchemaFingerprintToPayloadMapForFullSync();
+ } else {
+ schemaFingerprintToPayload =
buildSchemaFingerprintToPayloadMapForDeltaSync();
+ }
+
+ segmentSchemaCache.resetSchemaForPublishedSegments(
+ buildSegmentIdToMetadataMapForSync(datasourceToSummary),
+ schemaFingerprintToPayload
+ );
+
+ // Emit metrics for the current contents of the cache
+ segmentSchemaCache.getStats().forEach(this::emitMetric);
+ emitMetric(Metric.RETRIEVE_SEGMENT_SCHEMAS_DURATION_MILLIS,
schemaSyncDuration.millisElapsed());
+ }
+
+ /**
+ * Retrieves all used segment schemas from the metadata store and builds a
+ * fresh map from schema fingerprint to payload.
+ */
+ private Map<String, SchemaPayload>
buildSchemaFingerprintToPayloadMapForFullSync()
+ {
+ final List<SegmentSchemaRecord> records = query(
+ SqlSegmentsMetadataQuery::retrieveAllUsedSegmentSchemas
+ );
+
+ return records.stream().collect(
+ Collectors.toMap(
+ SegmentSchemaRecord::getFingerprint,
+ SegmentSchemaRecord::getPayload
+ )
+ );
+ }
+
+ /**
+ * Retrieves segment schemas from the metadata store if they are not present
+ * in the cache or have been recently updated in the metadata store. These
+ * segment schemas along with those already present in the cache are used to
+ * build a complete udpated map from schema fingerprint to payload.
+ *
+ * @return Complete updated map from schema fingerprint to payload for all
+ * used segment schemas currently persisted in the metadata store.
+ */
+ private Map<String, SchemaPayload>
buildSchemaFingerprintToPayloadMapForDeltaSync()
+ {
+ // Identify fingerprints in the cache and in the metadata store
+ final Map<String, SchemaPayload> schemaFingerprintToPayload = new
HashMap<>(
+ segmentSchemaCache.getPublishedSchemaPayloadMap()
+ );
+ final Set<String> cachedFingerprints =
Set.copyOf(schemaFingerprintToPayload.keySet());
+ final Set<String> persistedFingerprints = query(
+ SqlSegmentsMetadataQuery::retrieveAllUsedSegmentSchemaFingerprints
+ );
+
+ // Remove entry for schemas that have been deleted from the metadata store
+ final Set<String> deletedFingerprints =
Sets.difference(cachedFingerprints, persistedFingerprints);
+ deletedFingerprints.forEach(schemaFingerprintToPayload::remove);
+
+ // Retrieve and add entry for schemas that have been added to the metadata
store
+ final Set<String> addedFingerprints =
Sets.difference(persistedFingerprints, cachedFingerprints);
+ final List<SegmentSchemaRecord> addedSegmentSchemaRecords = query(
+ sql -> sql.retrieveUsedSegmentSchemasForFingerprints(addedFingerprints)
+ );
+ if (addedSegmentSchemaRecords.size() < addedFingerprints.size()) {
+ emitMetric(Metric.SKIPPED_SEGMENT_SCHEMAS, addedFingerprints.size() -
addedSegmentSchemaRecords.size());
+ }
+ addedSegmentSchemaRecords.forEach(
+ schema -> schemaFingerprintToPayload.put(schema.getFingerprint(),
schema.getPayload())
+ );
+
+ return schemaFingerprintToPayload;
+ }
+
+ /**
+ * Builds a map from {@link SegmentId} to {@link SegmentMetadata} for all
used
+ * segments currently present in the metadata store based on the current
sync.
+ */
+ private Map<SegmentId, SegmentMetadata> buildSegmentIdToMetadataMapForSync(
+ Map<String, DatasourceSegmentSummary> datasourceToSummary
+ )
+ {
+ final Map<SegmentId, SegmentMetadata> cachedSegmentIdToMetadata =
+ segmentSchemaCache.getPublishedSegmentMetadataMap();
+
+ final Map<SegmentId, SegmentMetadata> syncedSegmentIdToMetadataMap = new
HashMap<>(
+ cachedSegmentIdToMetadata
+ );
+
+ // Remove entry for segments not present in datasource cache (now synced
with the metadata store)
+ cachedSegmentIdToMetadata.keySet().forEach(segmentId -> {
+ final DataSegment cachedSegment =
+
getCacheForDatasource(segmentId.getDataSource()).findUsedSegment(segmentId);
+ if (cachedSegment == null) {
+ syncedSegmentIdToMetadataMap.remove(segmentId);
+ }
+ });
+
+ // Add entry for segments that have been added to the datasource cache
+ datasourceToSummary.values().forEach(summary -> {
+ summary.usedSegments.forEach(segment -> {
+ if (segment.getNumRows() != null && segment.getSchemaFingerprint() !=
null) {
+ syncedSegmentIdToMetadataMap.put(
+ segment.getDataSegment().getId(),
+ new SegmentMetadata(segment.getNumRows(),
segment.getSchemaFingerprint())
+ );
+ }
+ });
+ });
+
+ return syncedSegmentIdToMetadataMap;
+ }
+
/**
* Tries to parse the fields of the result set into a {@link
DataSegmentPlus}.
*
* @return null if an error occurred while parsing the result
*/
+ @Nullable
private DataSegmentPlus mapToSegmentPlus(ResultSet resultSet)
{
String segmentId = null;
@@ -885,8 +1071,8 @@ public class HeapMemorySegmentMetadataCache implements
SegmentMetadataCache
DateTimes.of(resultSet.getString(3)),
SqlSegmentsMetadataQuery.nullAndEmptySafeDate(resultSet.getString(4)),
true,
- null,
- null,
+ useSchemaCache ? resultSet.getString(5) : null,
+ useSchemaCache ? (Long) resultSet.getObject(6) : null,
null
);
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java
b/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java
index 6ce9e245614..c8e87ca4d55 100644
--- a/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java
+++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java
@@ -105,6 +105,11 @@ public class Metric
*/
public static final String RETRIEVE_PENDING_SEGMENTS_DURATION_MILLIS =
METRIC_NAME_PREFIX + "fetchPending/time";
+ /**
+ * Time taken in milliseconds to fetch all segment schemas from the metadata
store.
+ */
+ public static final String RETRIEVE_SEGMENT_SCHEMAS_DURATION_MILLIS =
METRIC_NAME_PREFIX + "fetchSchemas/time";
+
/**
* Time taken to update the datasource snapshot in the cache.
*/
@@ -148,6 +153,11 @@ public class Metric
*/
public static final String SKIPPED_SEGMENTS = METRIC_NAME_PREFIX + "skipped";
+ /**
+ * Number of unparseable segment schema records skipped while refreshing the
cache.
+ */
+ public static final String SKIPPED_SEGMENT_SCHEMAS = METRIC_NAME_PREFIX +
"schema/skipped";
+
/**
* Number of unparseable pending segment records skipped while refreshing
the cache.
*/
diff --git
a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentSchemaRecord.java
b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentSchemaRecord.java
new file mode 100644
index 00000000000..b814f1a7fb8
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentSchemaRecord.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata.segment.cache;
+
+import org.apache.druid.segment.SchemaPayload;
+
+/**
+ * Represents a single record in the druid_segmentSchemas table.
+ */
+public class SegmentSchemaRecord
+{
+ private final String fingerprint;
+ private final SchemaPayload payload;
+
+ public SegmentSchemaRecord(String fingerprint, SchemaPayload payload)
+ {
+ this.fingerprint = fingerprint;
+ this.payload = payload;
+ }
+
+ public String getFingerprint()
+ {
+ return fingerprint;
+ }
+
+ public SchemaPayload getPayload()
+ {
+ return payload;
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/NoopSegmentSchemaCache.java
b/server/src/main/java/org/apache/druid/segment/metadata/NoopSegmentSchemaCache.java
new file mode 100644
index 00000000000..c58c1f97209
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/segment/metadata/NoopSegmentSchemaCache.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.SegmentMetadata;
+import org.apache.druid.timeline.SegmentId;
+
+import java.util.Map;
+
+/**
+ * No-op implementation of {@link SegmentSchemaCache} that always returns false
+ * for {@link #isEnabled()} and {@link #isInitialized()}.
+ */
+public class NoopSegmentSchemaCache extends SegmentSchemaCache
+{
+ @Override
+ public boolean isEnabled()
+ {
+ return false;
+ }
+
+ @Override
+ public void setInitialized()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void onLeaderStop()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return false;
+ }
+
+ @Override
+ public void awaitInitialization() throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void resetSchemaForPublishedSegments(
+ Map<SegmentId, SegmentMetadata> usedSegmentIdToMetadata,
+ Map<String, SchemaPayload> schemaFingerprintToPayload
+ )
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addRealtimeSegmentSchema(SegmentId segmentId, SchemaPayloadPlus
schema)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addSchemaPendingBackfill(SegmentId segmentId, SchemaPayloadPlus
schema)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void markSchemaPersisted(SegmentId segmentId)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<SegmentId, SegmentMetadata> getPublishedSegmentMetadataMap()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isSchemaCached(SegmentId segmentId)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, SchemaPayload> getPublishedSchemaPayloadMap()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ SchemaPayloadPlus getTemporaryPublishedMetadataQueryResults(SegmentId id)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void segmentRemoved(SegmentId segmentId)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void realtimeSegmentRemoved(SegmentId segmentId)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, Integer> getStats()
+ {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
index 9a6830225dc..79276c70a92 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
@@ -94,6 +94,15 @@ public class SegmentSchemaCache
*/
private final AtomicInteger cacheMissCount = new AtomicInteger(0);
+ /**
+ * @return true if schema caching is enabled.
+ */
+ public boolean isEnabled()
+ {
+ // Always return true since this implementation is bound only when caching
is enabled
+ return true;
+ }
+
public void setInitialized()
{
if (!isInitialized()) {
@@ -224,9 +233,9 @@ public class SegmentSchemaCache
}
// segment schema has been polled from the DB
- SegmentMetadata segmentMetadata = getSegmentMetadataMap().get(segmentId);
+ SegmentMetadata segmentMetadata =
getPublishedSegmentMetadataMap().get(segmentId);
if (segmentMetadata != null) {
- SchemaPayload schemaPayload =
getSchemaPayloadMap().get(segmentMetadata.getSchemaFingerprint());
+ SchemaPayload schemaPayload =
getPublishedSchemaPayloadMap().get(segmentMetadata.getSchemaFingerprint());
if (schemaPayload != null) {
return Optional.of(
new SchemaPayloadPlus(schemaPayload, segmentMetadata.getNumRows())
@@ -251,19 +260,27 @@ public class SegmentSchemaCache
private boolean isPublishedSegmentSchemaCached(SegmentId segmentId)
{
- SegmentMetadata segmentMetadata = getSegmentMetadataMap().get(segmentId);
+ SegmentMetadata segmentMetadata =
getPublishedSegmentMetadataMap().get(segmentId);
if (segmentMetadata != null) {
- return
getSchemaPayloadMap().containsKey(segmentMetadata.getSchemaFingerprint());
+ return
getPublishedSchemaPayloadMap().containsKey(segmentMetadata.getSchemaFingerprint());
}
return false;
}
- private Map<SegmentId, SegmentMetadata> getSegmentMetadataMap()
+ /**
+ * @return Immutable map from segment ID to {@link SegmentMetadata} for all
+ * published used segments currently present in this cache.
+ */
+ public Map<SegmentId, SegmentMetadata> getPublishedSegmentMetadataMap()
{
return publishedSegmentSchemas.get().segmentIdToMetadata;
}
- private Map<String, SchemaPayload> getSchemaPayloadMap()
+ /**
+ * @return Immutable map from schema fingerprint to {@link SchemaPayload} for
+ * all schema fingerprints currently present in this cache.
+ */
+ public Map<String, SchemaPayload> getPublishedSchemaPayloadMap()
{
return publishedSegmentSchemas.get().schemaFingerprintToPayload;
}
@@ -298,8 +315,8 @@ public class SegmentSchemaCache
return Map.of(
Metric.CACHE_MISSES, cacheMissCount.getAndSet(0),
Metric.REALTIME_SEGMENT_SCHEMAS, realtimeSegmentSchemas.size(),
- Metric.USED_SEGMENT_SCHEMAS, getSegmentMetadataMap().size(),
- Metric.USED_SEGMENT_SCHEMA_FINGERPRINTS, getSchemaPayloadMap().size(),
+ Metric.USED_SEGMENT_SCHEMAS, getPublishedSegmentMetadataMap().size(),
+ Metric.USED_SEGMENT_SCHEMA_FINGERPRINTS,
getPublishedSchemaPayloadMap().size(),
Metric.SCHEMAS_PENDING_BACKFILL, schemasPendingBackfill.size()
);
}
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java
index de4d85742a4..8c1ed5c59ae 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java
@@ -34,9 +34,9 @@ import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SchemaPayloadPlus;
import org.apache.druid.timeline.SegmentId;
+import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
-import org.skife.jdbi.v2.TransactionCallback;
import java.util.ArrayList;
import java.util.Collections;
@@ -154,7 +154,8 @@ public class SegmentSchemaManager
final int version
)
{
- connector.retryTransaction((TransactionCallback<Void>) (handle, status) ->
{
+ connector.retryTransaction((handle, status) -> {
+ final DateTime updateTime = DateTimes.nowUtc();
Map<String, SchemaPayload> schemaPayloadMap = new HashMap<>();
for (SegmentSchemaMetadataPlus segmentSchema : segmentSchemas) {
@@ -163,8 +164,8 @@ public class SegmentSchemaManager
segmentSchema.getSegmentSchemaMetadata().getSchemaPayload()
);
}
- persistSegmentSchema(handle, dataSource, version, schemaPayloadMap);
- updateSegmentWithSchemaInformation(handle, segmentSchemas);
+ persistSegmentSchema(handle, dataSource, version, schemaPayloadMap,
updateTime);
+ updateSegmentWithSchemaInformation(handle, segmentSchemas, updateTime);
return null;
}, 1, 3);
@@ -177,7 +178,8 @@ public class SegmentSchemaManager
final Handle handle,
final String dataSource,
final int version,
- final Map<String, SchemaPayload> fingerprintSchemaPayloadMap
+ final Map<String, SchemaPayload> fingerprintSchemaPayloadMap,
+ final DateTime updateTime
) throws JsonProcessingException
{
if (fingerprintSchemaPayloadMap.isEmpty()) {
@@ -244,7 +246,7 @@ public class SegmentSchemaManager
PreparedBatch schemaInsertBatch = handle.prepareBatch(insertSql);
for (List<String> partition : partitionedFingerprints) {
for (String fingerprint : partition) {
- final String now = DateTimes.nowUtc().toString();
+ final String now = updateTime.toString();
schemaInsertBatch.add()
.bind("created_date", now)
.bind("datasource", dataSource)
@@ -280,7 +282,8 @@ public class SegmentSchemaManager
*/
public void updateSegmentWithSchemaInformation(
final Handle handle,
- final List<SegmentSchemaMetadataPlus> batch
+ final List<SegmentSchemaMetadataPlus> batch,
+ final DateTime updateTime
)
{
log.debug("Updating segment with schemaFingerprint and numRows
information: [%s].", batch);
@@ -288,7 +291,11 @@ public class SegmentSchemaManager
// update schemaFingerprint and numRows in segments table
String updateSql =
StringUtils.format(
- "UPDATE %s SET schema_fingerprint = :schema_fingerprint, num_rows
= :num_rows WHERE id = :id",
+ "UPDATE %s"
+ + " SET schema_fingerprint = :schema_fingerprint,"
+ + " num_rows = :num_rows,"
+ + " used_status_last_updated = :last_updated"
+ + " WHERE id = :id",
dbTables.getSegmentsTable()
);
@@ -304,7 +311,8 @@ public class SegmentSchemaManager
segmentUpdateBatch.add()
.bind("id", segmentSchema.getSegmentId().toString())
.bind("schema_fingerprint", fingerprint)
- .bind("num_rows",
segmentSchema.getSegmentSchemaMetadata().getNumRows());
+ .bind("num_rows",
segmentSchema.getSegmentSchemaMetadata().getNumRows())
+ .bind("last_updated", updateTime.toString());
}
final int[] affectedRows = segmentUpdateBatch.execute();
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorMarkUsedTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorMarkUsedTest.java
index 7007607a2b0..2406af6f12b 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorMarkUsedTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorMarkUsedTest.java
@@ -20,7 +20,6 @@
package org.apache.druid.metadata;
import com.google.common.collect.ImmutableList;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -79,7 +78,6 @@ public class IndexerSQLMetadataStorageCoordinatorMarkUsedTest
extends IndexerSql
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
)
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
index 5f5d0db9f58..48656aa93e9 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
@@ -29,12 +29,14 @@ import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory;
+import
org.apache.druid.metadata.segment.SqlSegmentMetadataReadOnlyTransactionFactory;
import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.segment.TestDataSource;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
@@ -101,6 +103,7 @@ public class
IndexerSQLMetadataStorageCoordinatorReadOnlyTest extends IndexerSql
mapper,
() -> new SegmentsMetadataManagerConfig(null, cacheMode),
derbyConnectorRule.metadataTablesConfigSupplier(),
+ new NoopSegmentSchemaCache(),
derbyConnector,
(corePoolSize, nameFormat) -> new WrappingScheduledExecutorService(
nameFormat,
@@ -151,15 +154,23 @@ public class
IndexerSQLMetadataStorageCoordinatorReadOnlyTest extends IndexerSql
NodeRole nodeRole
)
{
- final SegmentMetadataTransactionFactory transactionFactory = new
SqlSegmentMetadataTransactionFactory(
- mapper,
- derbyConnectorRule.metadataTablesConfigSupplier().get(),
- derbyConnector,
- leaderSelector,
- Set.of(nodeRole),
- segmentMetadataCache,
- emitter
- );
+ final SegmentMetadataTransactionFactory transactionFactory;
+ if (nodeRole.equals(NodeRole.COORDINATOR)) {
+ transactionFactory = new SqlSegmentMetadataReadOnlyTransactionFactory(
+ mapper,
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ derbyConnector
+ );
+ } else {
+ transactionFactory = new SqlSegmentMetadataTransactionFactory(
+ mapper,
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ derbyConnector,
+ leaderSelector,
+ segmentMetadataCache,
+ emitter
+ );
+ }
return new IndexerSQLMetadataStorageCoordinator(
transactionFactory,
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index d5572c5b058..9a1b7ce63ed 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.StringTuple;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.ExceptionMatcher;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.ObjectMetadata;
@@ -45,6 +44,7 @@ import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.TestDataSource;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.FingerprintGenerator;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.metadata.SegmentSchemaTestUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
@@ -153,6 +153,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
mapper,
() -> new SegmentsMetadataManagerConfig(null, cacheMode),
derbyConnectorRule.metadataTablesConfigSupplier(),
+ new NoopSegmentSchemaCache(),
derbyConnector,
(corePoolSize, nameFormat) -> new WrappingScheduledExecutorService(
nameFormat,
@@ -177,7 +178,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
leaderSelector,
- Set.of(NodeRole.OVERLORD),
segmentMetadataCache,
emitter
)
@@ -997,7 +997,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
public void testRetrieveSegmentForId()
{
coordinator.commitSegments(Set.of(defaultSegment), null);
- markAllSegmentsUnused(ImmutableSet.of(defaultSegment), DateTimes.nowUtc());
+ coordinator.markSegmentAsUnused(defaultSegment.getId());
Assert.assertEquals(
defaultSegment,
coordinator.retrieveSegmentForId(defaultSegment.getId())
@@ -2613,8 +2613,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals(1, identifier3.getShardSpec().getNumCorePartitions());
// now drop the used segment previously loaded:
- markAllSegmentsUnused(ImmutableSet.of(segment), DateTimes.nowUtc());
- refreshCache();
+ coordinator.markSegmentAsUnused(segment.getId());
// and final load, this reproduces an issue that could happen with
multiple streaming appends,
// followed by a reindex, followed by a drop, and more streaming data
coming in for same interval
@@ -2784,8 +2783,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
// 5) reverted compaction (by marking B_0 as unused)
// Revert compaction a manual metadata update which is basically the
following two steps:
- markAllSegmentsUnused(ImmutableSet.of(compactedSegment),
DateTimes.nowUtc()); // <- drop compacted segment
- refreshCache();
+ coordinator.markSegmentAsUnused(compactedSegment.getId());
// pending: version = A, id = 0,1,2
// version = B, id = 1
//
@@ -3638,8 +3636,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertTrue(coordinator.commitSegments(dataSegments, new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)).containsAll(dataSegments));
// Mark the tombstone as unused
- markAllSegmentsUnused(tombstones, DateTimes.nowUtc());
- refreshCache();
+ coordinator.markSegmentAsUnused(tombstoneSegment.getId());
final Collection<DataSegment> allUsedSegments =
coordinator.retrieveAllUsedSegments(
TestDataSource.WIKI,
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
index a4eddec3d4e..d98e54c3ea5 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
@@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.segment.SegmentMetadataTransaction;
@@ -99,7 +99,6 @@ public class
IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest extends
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
- Set.of(NodeRole.OVERLORD),
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
);
@@ -449,7 +448,8 @@ public class
IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest extends
handle,
"fooDataSource",
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION,
- schemaPayloadMapToPerist
+ schemaPayloadMapToPerist,
+ DateTimes.nowUtc()
);
return null;
});
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
index a5e30ae90a7..17a02f19a73 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
@@ -50,6 +50,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.PreparedBatchPart;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.util.StringMapper;
@@ -578,11 +579,12 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
);
}
- insertSegments(usedSegments, derbyConnectorRule, jsonMapper);
+ insertSegments(usedSegments, false, derbyConnectorRule, jsonMapper);
}
public static void insertSegments(
Set<DataSegmentPlus> dataSegments,
+ boolean includeSchema,
TestDerbyConnector.DerbyConnectorRule derbyConnectorRule,
ObjectMapper jsonMapper
)
@@ -590,23 +592,15 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
final TestDerbyConnector connector = derbyConnectorRule.getConnector();
final String table =
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
+ final String sql = getSegmentInsertSql(includeSchema, table, connector);
connector.retryWithHandle(
handle -> {
- PreparedBatch preparedBatch = handle.prepareBatch(
- StringUtils.format(
- "INSERT INTO %1$s (id, dataSource, created_date, start,
%2$send%2$s, partitioned, version,"
- + " used, payload, used_status_last_updated,
upgraded_from_segment_id) "
- + "VALUES (:id, :dataSource, :created_date, :start, :end,
:partitioned, :version,"
- + " :used, :payload, :used_status_last_updated,
:upgraded_from_segment_id)",
- table,
- connector.getQuoteString()
- )
- );
+ PreparedBatch preparedBatch = handle.prepareBatch(sql);
for (DataSegmentPlus segmentPlus : dataSegments) {
final DataSegment segment = segmentPlus.getDataSegment();
String id = segment.getId().toString();
- preparedBatch.add()
- .bind("id", id)
+ final PreparedBatchPart segmentRecord = preparedBatch.add();
+ segmentRecord.bind("id", id)
.bind("dataSource", segment.getDataSource())
.bind("created_date",
nullSafeString(segmentPlus.getCreatedDate()))
.bind("start",
segment.getInterval().getStart().toString())
@@ -617,6 +611,11 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
.bind("payload",
jsonMapper.writeValueAsBytes(segment))
.bind("used_status_last_updated",
nullSafeString(segmentPlus.getUsedStatusLastUpdatedDate()))
.bind("upgraded_from_segment_id",
segmentPlus.getUpgradedFromSegmentId());
+
+ if (includeSchema) {
+ segmentRecord.bind("num_rows", segmentPlus.getNumRows())
+ .bind("schema_fingerprint",
segmentPlus.getSchemaFingerprint());
+ }
}
final int[] affectedRows = preparedBatch.execute();
@@ -629,6 +628,31 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
);
}
+ private static String getSegmentInsertSql(boolean includeSchema, String
table, TestDerbyConnector connector)
+ {
+ final String sql;
+ if (includeSchema) {
+ sql = StringUtils.format(
+ "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s,
partitioned, version,"
+ + " used, payload, used_status_last_updated,
upgraded_from_segment_id, num_rows, schema_fingerprint) "
+ + "VALUES (:id, :dataSource, :created_date, :start, :end,
:partitioned, :version,"
+ + " :used, :payload, :used_status_last_updated,
:upgraded_from_segment_id, :num_rows, :schema_fingerprint)",
+ table,
+ connector.getQuoteString()
+ );
+ } else {
+ sql = StringUtils.format(
+ "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s,
partitioned, version,"
+ + " used, payload, used_status_last_updated,
upgraded_from_segment_id) "
+ + "VALUES (:id, :dataSource, :created_date, :start, :end,
:partitioned, :version,"
+ + " :used, :payload, :used_status_last_updated,
:upgraded_from_segment_id)",
+ table,
+ connector.getQuoteString()
+ );
+ }
+ return sql;
+ }
+
@Nullable
private static String nullSafeString(DateTime date)
{
diff --git
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
index 7e2aff60900..fb310430ac3 100644
---
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
@@ -57,7 +57,7 @@ public class SqlSegmentsMetadataManagerProviderTest
connector,
lifecycle,
segmentSchemaCache,
- CentralizedDatasourceSchemaConfig.create(),
+ CentralizedDatasourceSchemaConfig::create,
NoopServiceEmitter.instance()
);
SegmentsMetadataManager manager = provider.get();
diff --git
a/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
b/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
index 2f5c18dcd79..e5554b931ee 100644
---
a/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
+++
b/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
@@ -21,7 +21,6 @@ package org.apache.druid.metadata.segment;
import com.google.common.base.Suppliers;
import org.apache.druid.client.DataSourcesSnapshot;
-import org.apache.druid.error.ExceptionMatcher;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
@@ -31,16 +30,16 @@ import
org.apache.druid.metadata.SqlSegmentsMetadataManagerTestBase;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
import org.apache.druid.metadata.segment.cache.Metric;
-import org.apache.druid.metadata.segment.cache.NoopSegmentMetadataCache;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.segment.TestDataSource;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
+import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
import org.apache.druid.timeline.DataSegment;
import org.assertj.core.util.Sets;
-import org.hamcrest.MatcherAssert;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.After;
@@ -82,7 +81,8 @@ public class SqlSegmentsMetadataManagerV2Test extends
SqlSegmentsMetadataManager
}
private void initManager(
- SegmentMetadataCache.UsageMode cacheMode
+ SegmentMetadataCache.UsageMode cacheMode,
+ boolean useSchemaCache
)
{
segmentMetadataCacheExec = new BlockingExecutorService("test");
@@ -90,6 +90,7 @@ public class SqlSegmentsMetadataManagerV2Test extends
SqlSegmentsMetadataManager
jsonMapper,
Suppliers.ofInstance(new
SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)),
Suppliers.ofInstance(storageConfig),
+ useSchemaCache ? new SegmentSchemaCache() : new
NoopSegmentSchemaCache(),
connector,
(poolSize, name) -> new WrappingScheduledExecutorService(name,
segmentMetadataCacheExec, false),
emitter
@@ -103,7 +104,7 @@ public class SqlSegmentsMetadataManagerV2Test extends
SqlSegmentsMetadataManager
connector,
Suppliers.ofInstance(config),
derbyConnectorRule.metadataTablesConfigSupplier(),
- CentralizedDatasourceSchemaConfig.create(),
+ CentralizedDatasourceSchemaConfig::create,
emitter,
jsonMapper
);
@@ -132,7 +133,7 @@ public class SqlSegmentsMetadataManagerV2Test extends
SqlSegmentsMetadataManager
@Test
public void test_manager_usesCachedSegments_ifCacheIsEnabled()
{
- initManager(SegmentMetadataCache.UsageMode.ALWAYS);
+ initManager(SegmentMetadataCache.UsageMode.ALWAYS, false);
manager.startPollingDatabasePeriodically();
Assert.assertTrue(manager.isPollingDatabasePeriodically());
@@ -146,13 +147,16 @@ public class SqlSegmentsMetadataManagerV2Test extends
SqlSegmentsMetadataManager
emitter.verifyNotEmitted("segment/poll/time");
emitter.verifyNotEmitted("segment/pollWithSchema/time");
+ emitter.verifyNotEmitted(Metric.RETRIEVE_SEGMENT_SCHEMAS_DURATION_MILLIS);
+ emitter.verifyNotEmitted("segment/schemaCache/used/count");
+
emitter.verifyEmitted(Metric.SYNC_DURATION_MILLIS, 2);
}
@Test
public void test_manager_pollsSegments_ifCacheIsDisabled()
{
- initManager(SegmentMetadataCache.UsageMode.NEVER);
+ initManager(SegmentMetadataCache.UsageMode.NEVER, false);
manager.startPollingDatabasePeriodically();
Assert.assertTrue(manager.isPollingDatabasePeriodically());
@@ -167,34 +171,25 @@ public class SqlSegmentsMetadataManagerV2Test extends
SqlSegmentsMetadataManager
}
@Test
- public void test_manager_throwsException_ifBothCacheAndSchemaAreEnabled()
+ public void
test_manager_usesCachedSegmentsAndSchemas_ifBothCacheAndSchemaAreEnabled()
{
- MatcherAssert.assertThat(
- Assert.assertThrows(
- IllegalArgumentException.class,
- () -> new SqlSegmentsMetadataManagerV2(
- new NoopSegmentMetadataCache() {
- @Override
- public boolean isEnabled()
- {
- return true;
- }
- },
- segmentSchemaCache,
- connector,
- Suppliers.ofInstance(config),
- derbyConnectorRule.metadataTablesConfigSupplier(),
- CentralizedDatasourceSchemaConfig.enabled(true),
- emitter,
- jsonMapper
- )
- ),
- ExceptionMatcher.of(IllegalArgumentException.class).expectMessageIs(
- "Segment metadata incremental
cache['druid.manager.segments.useIncrementalCache']"
- + " and segment schema
cache['druid.centralizedDatasourceSchema.enabled']"
- + " must not be enabled together."
- )
- );
+ initManager(SegmentMetadataCache.UsageMode.ALWAYS, true);
+
+ manager.startPollingDatabasePeriodically();
+ Assert.assertTrue(manager.isPollingDatabasePeriodically());
+
+ syncSegmentMetadataCache();
+ verifyDatasourceSnapshot();
+
+ // isPolling returns true even after stop since cache is still polling the
metadata store
+ manager.stopPollingDatabasePeriodically();
+ Assert.assertTrue(manager.isPollingDatabasePeriodically());
+
+ emitter.verifyNotEmitted("segment/poll/time");
+ emitter.verifyNotEmitted("segment/pollWithSchema/time");
+ emitter.verifyEmitted(Metric.SYNC_DURATION_MILLIS, 2);
+ emitter.verifyEmitted(Metric.RETRIEVE_SEGMENT_SCHEMAS_DURATION_MILLIS, 2);
+ emitter.verifyEmitted("segment/schemaCache/used/count", 2);
}
private void verifyDatasourceSnapshot()
diff --git
a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
index 48983284071..2a5940b0495 100644
---
a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
@@ -32,8 +32,16 @@ import
org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SegmentMetadata;
import org.apache.druid.segment.TestDataSource;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.metadata.FingerprintGenerator;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
+import org.apache.druid.segment.metadata.SegmentSchemaCache;
+import org.apache.druid.segment.metadata.SegmentSchemaTestUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
@@ -52,13 +60,14 @@ import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class HeapMemorySegmentMetadataCacheTest
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule
- = new TestDerbyConnector.DerbyConnectorRule();
+ = new
TestDerbyConnector.DerbyConnectorRule(CentralizedDatasourceSchemaConfig.enabled(true));
private BlockingExecutorService pollExecutor;
private ScheduledExecutorFactory executorFactory;
@@ -66,6 +75,8 @@ public class HeapMemorySegmentMetadataCacheTest
private StubServiceEmitter serviceEmitter;
private HeapMemorySegmentMetadataCache cache;
+ private SegmentSchemaCache schemaCache;
+ private SegmentSchemaTestUtils schemaTestUtils;
@Before
public void setup()
@@ -76,8 +87,10 @@ public class HeapMemorySegmentMetadataCacheTest
serviceEmitter = new StubServiceEmitter();
derbyConnector.createSegmentTable();
+ derbyConnector.createSegmentSchemasTable();
derbyConnector.createPendingSegmentsTable();
+ schemaTestUtils = new SegmentSchemaTestUtils(derbyConnectorRule,
derbyConnector, TestHelper.JSON_MAPPER);
EmittingLogger.registerEmitter(serviceEmitter);
}
@@ -90,26 +103,41 @@ public class HeapMemorySegmentMetadataCacheTest
}
}
+ private void setupTargetWithCaching(SegmentMetadataCache.UsageMode cacheMode)
+ {
+ setupTargetWithCaching(cacheMode, false);
+ }
+
/**
* Creates the target {@link #cache} to be tested in the current test.
*/
- private void setupTargetWithCaching(SegmentMetadataCache.UsageMode cacheMode)
+ private void setupTargetWithCaching(SegmentMetadataCache.UsageMode
cacheMode, boolean useSchemaCache)
{
if (cache != null) {
throw new ISE("Test target has already been initialized with
caching[%s]", cache.isEnabled());
}
final SegmentsMetadataManagerConfig metadataManagerConfig
= new SegmentsMetadataManagerConfig(null, cacheMode);
+ schemaCache = useSchemaCache ? new SegmentSchemaCache() : new
NoopSegmentSchemaCache();
cache = new HeapMemorySegmentMetadataCache(
TestHelper.JSON_MAPPER,
() -> metadataManagerConfig,
derbyConnectorRule.metadataTablesConfigSupplier(),
+ schemaCache,
derbyConnector,
executorFactory,
serviceEmitter
);
}
+ private void setupAndSyncCacheWithSchema()
+ {
+ setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS, true);
+ cache.start();
+ cache.becomeLeader();
+ syncCacheAfterBecomingLeader();
+ }
+
private void setupAndSyncCache()
{
setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS);
@@ -692,10 +720,141 @@ public class HeapMemorySegmentMetadataCacheTest
serviceEmitter.verifyValue(Metric.DELETED_DATASOURCES, 1L);
}
+ @Test
+ public void test_sync_addsUsedSegmentSchema_ifNotPresentInCache()
+ {
+ setupAndSyncCacheWithSchema();
+
+ Assert.assertTrue(
+ schemaCache.getPublishedSchemaPayloadMap().isEmpty()
+ );
+
+ final SchemaPayload payload = new SchemaPayload(
+ RowSignature.builder().add("col1", null).build()
+ );
+ final String fingerprint = getSchemaFingerprint(payload);
+ schemaTestUtils.insertSegmentSchema(
+ TestDataSource.WIKI,
+ Map.of(fingerprint, payload),
+ Set.of(fingerprint)
+ );
+
+ syncCache();
+ serviceEmitter.verifyValue("segment/schemaCache/usedFingerprint/count",
1L);
+
+ Assert.assertEquals(
+ Map.of(fingerprint, payload),
+ schemaCache.getPublishedSchemaPayloadMap()
+ );
+ }
+
+ @Test
+ public void test_sync_removesUsedSegmentSchema_ifNotPresentInMetadataStore()
+ {
+ setupAndSyncCacheWithSchema();
+
+ final SchemaPayload payload = new SchemaPayload(
+ RowSignature.builder().add("col1", null).build()
+ );
+ final String fingerprint = getSchemaFingerprint(payload);
+
+ schemaCache.resetSchemaForPublishedSegments(
+ Map.of(),
+ Map.of(fingerprint, payload)
+ );
+ Assert.assertEquals(
+ Map.of(fingerprint, payload),
+ schemaCache.getPublishedSchemaPayloadMap()
+ );
+
+ syncCache();
+ serviceEmitter.verifyValue("segment/schemaCache/usedFingerprint/count",
0L);
+
+ Assert.assertTrue(
+ schemaCache.getPublishedSchemaPayloadMap().isEmpty()
+ );
+ }
+
+ @Test
+ public void test_sync_addsUsedSegmentMetadata_ifNotPresentInCache()
+ {
+ setupAndSyncCacheWithSchema();
+
+ Assert.assertTrue(
+ schemaCache.getPublishedSegmentMetadataMap().isEmpty()
+ );
+
+ final SchemaPayload payload = new SchemaPayload(
+ RowSignature.builder().add("col1", null).build()
+ );
+ final String fingerprint = getSchemaFingerprint(payload);
+
+ final DataSegmentPlus usedSegmentPlus
+ = CreateDataSegments.ofDatasource(TestDataSource.WIKI)
+
.withNumRows(10L).withSchemaFingerprint(fingerprint)
+ .updatedNow().markUsed().asPlus();
+ insertSegmentsInMetadataStoreWithSchema(usedSegmentPlus);
+
+ syncCache();
+ serviceEmitter.verifyValue(Metric.PERSISTED_USED_SEGMENTS, 1L);
+ serviceEmitter.verifyValue(Metric.CACHED_USED_SEGMENTS, 1L);
+ serviceEmitter.verifyValue(Metric.UPDATED_USED_SEGMENTS, 1L);
+ serviceEmitter.verifyValue("segment/schemaCache/used/count", 1L);
+
+ Assert.assertEquals(
+ Map.of(usedSegmentPlus.getDataSegment().getId(), new
SegmentMetadata(10L, fingerprint)),
+ schemaCache.getPublishedSegmentMetadataMap()
+ );
+ }
+
+ @Test
+ public void
test_sync_removesUsedSegmentMetadata_ifNotPresentInMetadataStore()
+ {
+ setupAndSyncCacheWithSchema();
+
+ final SchemaPayload payload = new SchemaPayload(
+ RowSignature.builder().add("col1", null).build()
+ );
+ final String fingerprint = getSchemaFingerprint(payload);
+ final SegmentId segmentId = SegmentId.dummy(TestDataSource.WIKI);
+ final SegmentMetadata metadata = new SegmentMetadata(10L, fingerprint);
+
+ schemaCache.resetSchemaForPublishedSegments(
+ Map.of(segmentId, metadata),
+ Map.of()
+ );
+ Assert.assertEquals(
+ Map.of(segmentId, metadata),
+ schemaCache.getPublishedSegmentMetadataMap()
+ );
+
+ syncCache();
+ serviceEmitter.verifyValue("segment/schemaCache/used/count", 0L);
+
+ Assert.assertTrue(
+ schemaCache.getPublishedSegmentMetadataMap().isEmpty()
+ );
+ }
+
+ private static String getSchemaFingerprint(SchemaPayload payload)
+ {
+ return new
FingerprintGenerator(TestHelper.JSON_MAPPER).generateFingerprint(
+ payload,
+ TestDataSource.WIKI,
+ CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+ );
+ }
+
private void insertSegmentsInMetadataStore(Set<DataSegmentPlus> segments)
{
IndexerSqlMetadataStorageCoordinatorTestBase
- .insertSegments(segments, derbyConnectorRule, TestHelper.JSON_MAPPER);
+ .insertSegments(segments, false, derbyConnectorRule,
TestHelper.JSON_MAPPER);
+ }
+
+ private void insertSegmentsInMetadataStoreWithSchema(DataSegmentPlus...
segments)
+ {
+ IndexerSqlMetadataStorageCoordinatorTestBase
+ .insertSegments(Set.of(segments), true, derbyConnectorRule,
TestHelper.JSON_MAPPER);
}
private void updateSegmentInMetadataStore(DataSegmentPlus segment)
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
index b5775409e69..ab3c6efcee1 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
@@ -61,6 +61,8 @@ public class CreateDataSegments
private Boolean used;
private DateTime lastUpdatedTime;
private String upgradedFromSegmentId;
+ private String schemaFingerprint;
+ private Long numRows;
public static CreateDataSegments ofDatasource(String datasource)
{
@@ -109,6 +111,18 @@ public class CreateDataSegments
return this;
}
+ public CreateDataSegments withNumRows(Long numRows)
+ {
+ this.numRows = numRows;
+ return this;
+ }
+
+ public CreateDataSegments withSchemaFingerprint(String schemaFingerprint)
+ {
+ this.schemaFingerprint = schemaFingerprint;
+ return this;
+ }
+
public CreateDataSegments markUnused()
{
this.used = false;
@@ -186,8 +200,8 @@ public class CreateDataSegments
DateTimes.nowUtc(),
lastUpdatedTime,
used,
- null,
- null,
+ schemaFingerprint,
+ numRows,
upgradedFromSegmentId
);
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java
index c5e2012d02a..dae4d48a1bf 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java
@@ -229,7 +229,8 @@ public class KillUnreferencedSegmentSchemaTest
handle,
"foo",
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION,
- Collections.singletonMap(fingerprint, schemaPayload)
+ Collections.singletonMap(fingerprint, schemaPayload),
+ DateTimes.nowUtc()
);
return null;
}
@@ -304,7 +305,8 @@ public class KillUnreferencedSegmentSchemaTest
handle,
"foo",
0,
- Collections.singletonMap(fingerprintOldVersion, schemaPayload)
+ Collections.singletonMap(fingerprintOldVersion, schemaPayload),
+ DateTimes.nowUtc()
);
return null;
}
@@ -316,7 +318,8 @@ public class KillUnreferencedSegmentSchemaTest
handle,
"foo",
1,
- Collections.singletonMap(fingerprintNewVersion, schemaPayload)
+ Collections.singletonMap(fingerprintNewVersion, schemaPayload),
+ DateTimes.nowUtc()
);
return null;
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 4adf29d6238..5a94d73a3e8 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -47,6 +47,8 @@ import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.MetadataConfigModule;
+import org.apache.druid.guice.MetadataManagerModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.SegmentSchemaCacheModule;
import org.apache.druid.guice.SupervisorCleanupModule;
@@ -61,12 +63,8 @@ import
org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
-import org.apache.druid.metadata.MetadataRuleManager;
-import org.apache.druid.metadata.MetadataRuleManagerProvider;
import org.apache.druid.metadata.MetadataStorage;
import org.apache.druid.metadata.MetadataStorageProvider;
-import org.apache.druid.metadata.SegmentsMetadataManager;
-import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
import org.apache.druid.query.lookup.LookupSerdeModule;
import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache;
import org.apache.druid.segment.metadata.SegmentMetadataCacheConfig;
@@ -74,7 +72,6 @@ import
org.apache.druid.server.compaction.CompactionStatusTracker;
import org.apache.druid.server.coordinator.CloneStatusManager;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.DruidCoordinator;
-import org.apache.druid.server.coordinator.MetadataManager;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs;
import org.apache.druid.server.coordinator.config.CoordinatorPeriodConfig;
@@ -145,7 +142,7 @@ public class CliCoordinator extends ServerRunnable
{
this.properties = properties;
beOverlord = isOverlord(properties);
- isSegmentSchemaCacheEnabled = isSegmentSchemaCacheEnabled(properties);
+ isSegmentSchemaCacheEnabled =
MetadataConfigModule.isSegmentSchemaCacheEnabled(properties);
if (beOverlord) {
log.info("Coordinator is configured to act as Overlord as well (%s =
true).", AS_OVERLORD_PROPERTY);
@@ -167,6 +164,7 @@ public class CliCoordinator extends ServerRunnable
List<Module> modules = new ArrayList<>();
modules.add(JettyHttpClientModule.global());
+ modules.add(new MetadataManagerModule());
if (isSegmentSchemaCacheEnabled) {
validateCentralizedDatasourceSchemaConfig(properties);
@@ -213,19 +211,9 @@ public class CliCoordinator extends ServerRunnable
binder.bind(DirectDruidClientFactory.class).toProvider(Providers.of(null));
}
- binder.bind(SegmentsMetadataManager.class)
- .toProvider(SegmentsMetadataManagerProvider.class)
- .in(ManageLifecycle.class);
-
- binder.bind(MetadataRuleManager.class)
- .toProvider(MetadataRuleManagerProvider.class)
- .in(ManageLifecycle.class);
-
binder.bind(LookupCoordinatorManager.class).in(LazySingleton.class);
binder.bind(CloneStatusManager.class).in(LazySingleton.class);
- binder.bind(CoordinatorConfigManager.class);
- binder.bind(MetadataManager.class);
binder.bind(DruidCoordinator.class);
binder.bind(CompactionStatusTracker.class).in(LazySingleton.class);
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 5cdf7fefd3e..95fa654674d 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -50,6 +50,7 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ListProvider;
import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.MetadataManagerModule;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.SupervisorModule;
import org.apache.druid.guice.annotations.Json;
@@ -108,8 +109,6 @@ import
org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManag
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.metadata.SegmentsMetadataManager;
-import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.lookup.LookupSerdeModule;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
@@ -118,7 +117,6 @@ import
org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import
org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
import org.apache.druid.server.compaction.CompactionStatusTracker;
-import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.http.RedirectFilter;
@@ -196,6 +194,7 @@ public class CliOverlord extends ServerRunnable
protected List<? extends Module> getModules(final boolean standalone)
{
return ImmutableList.of(
+ standalone ? new MetadataManagerModule() : binder -> {},
new Module()
{
@Override
@@ -211,10 +210,6 @@ public class CliOverlord extends ServerRunnable
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8290);
binder.bind(CompactionStatusTracker.class).in(LazySingleton.class);
- binder.bind(SegmentsMetadataManager.class)
- .toProvider(SegmentsMetadataManagerProvider.class)
- .in(ManageLifecycle.class);
-
binder.bind(CoordinatorConfigManager.class).in(LazySingleton.class);
}
JsonConfigProvider.bind(binder, "druid.coordinator.asOverlord",
CoordinatorOverlordServiceConfig.class);
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 065a6430563..e9cbab108d3 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -73,12 +73,9 @@ import org.apache.druid.indexer.report.TaskReportFileWriter;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.TaskToolboxFactory;
-import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
-import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.config.TaskStorageConfig;
import
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import
org.apache.druid.indexing.common.task.batch.parallel.DeepStorageShuffleClient;
@@ -86,11 +83,8 @@ import
org.apache.druid.indexing.common.task.batch.parallel.HttpShuffleClient;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProviderImpl;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
-import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
-import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
import org.apache.druid.indexing.overlord.TaskRunner;
-import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycle;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig;
@@ -99,7 +93,6 @@ import
org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QuerySegmentWalker;
@@ -485,29 +478,9 @@ public class CliPeon extends GuiceRunnable
private static void configureTaskActionClient(Binder binder)
{
- PolyBind.createChoice(
- binder,
- "druid.peon.mode",
- Key.get(TaskActionClientFactory.class),
- Key.get(RemoteTaskActionClientFactory.class)
- );
- final MapBinder<String, TaskActionClientFactory> taskActionBinder =
- PolyBind.optionBinder(binder, Key.get(TaskActionClientFactory.class));
- taskActionBinder
- .addBinding("local")
- .to(LocalTaskActionClientFactory.class)
- .in(LazySingleton.class);
- // all of these bindings are so that we can run the peon in local mode
- JsonConfigProvider.bind(binder, "druid.indexer.storage",
TaskStorageConfig.class);
-
binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);
- binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
- binder.bind(IndexerMetadataStorageCoordinator.class)
- .to(IndexerSQLMetadataStorageCoordinator.class)
+ binder.bind(TaskActionClientFactory.class)
+ .to(RemoteTaskActionClientFactory.class)
.in(LazySingleton.class);
- taskActionBinder
- .addBinding("remote")
- .to(RemoteTaskActionClientFactory.class)
- .in(LazySingleton.class);
binder.bind(NodeRole.class).annotatedWith(Self.class).toInstance(NodeRole.PEON);
}
diff --git a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
index 5faf6e134c6..3dc1711a50e 100644
--- a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
+++ b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
@@ -33,13 +33,13 @@ import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.MetadataConfigModule;
import org.apache.druid.guice.ServerViewModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.server.DruidNode;
import java.lang.annotation.Annotation;
@@ -53,9 +53,6 @@ import java.util.Set;
*/
public abstract class ServerRunnable extends GuiceRunnable
{
- private static final String CENTRALIZED_DATASOURCE_SCHEMA_ENABLED =
- CentralizedDatasourceSchemaConfig.PROPERTY_PREFIX + ".enabled";
-
private static final EmittingLogger log = new
EmittingLogger(ServerRunnable.class);
public ServerRunnable(Logger log)
@@ -207,7 +204,7 @@ public abstract class ServerRunnable extends GuiceRunnable
protected static void validateCentralizedDatasourceSchemaConfig(Properties
properties)
{
- if (isSegmentSchemaCacheEnabled(properties)) {
+ if (MetadataConfigModule.isSegmentSchemaCacheEnabled(properties)) {
String serverViewType =
properties.getProperty(ServerViewModule.SERVERVIEW_TYPE_PROPERTY);
if (serverViewType != null &&
!serverViewType.equals(ServerViewModule.SERVERVIEW_TYPE_HTTP)) {
throw DruidException
@@ -221,15 +218,10 @@ public abstract class ServerRunnable extends GuiceRunnable
ServerViewModule.SERVERVIEW_TYPE_PROPERTY,
serverViewType,
ServerViewModule.SERVERVIEW_TYPE_HTTP,
- CENTRALIZED_DATASOURCE_SCHEMA_ENABLED
+ MetadataConfigModule.CENTRALIZED_DATASOURCE_SCHEMA_ENABLED
)
);
}
}
}
-
- protected static boolean isSegmentSchemaCacheEnabled(Properties properties)
- {
- return
Boolean.parseBoolean(properties.getProperty(CENTRALIZED_DATASOURCE_SCHEMA_ENABLED));
- }
}
diff --git
a/services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java
b/services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java
new file mode 100644
index 00000000000..6d501f9f768
--- /dev/null
+++ b/services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.google.inject.Binder;
+import com.google.inject.Inject;
+import com.google.inject.Module;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
+import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.metadata.MetadataRuleManagerConfig;
+import org.apache.druid.metadata.MetadataRuleManagerProvider;
+import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.apache.druid.metadata.SQLMetadataRuleManagerProvider;
+import org.apache.druid.metadata.SQLMetadataSupervisorManager;
+import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
+import org.apache.druid.metadata.SqlSegmentsMetadataManagerProvider;
+import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory;
+import
org.apache.druid.metadata.segment.SqlSegmentMetadataReadOnlyTransactionFactory;
+import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
+import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
+import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
+import org.apache.druid.segment.metadata.SegmentSchemaCache;
+import org.apache.druid.server.coordinator.CoordinatorConfigManager;
+import org.apache.druid.server.coordinator.MetadataManager;
+
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Module used by Overlord and Coordinator to bind the following metadata
managers:
+ * <ul>
+ * <li>{@link MetadataManager} - Coordinator only</li>
+ * <li>{@link MetadataRuleManager} - Coordinator only</li>
+ * <li>{@link MetadataSupervisorManager}</li>
+ * <li>{@link SegmentsMetadataManager}</li>
+ * <li>{@link IndexerMetadataStorageCoordinator}</li>
+ * <li>{@link CoordinatorConfigManager}</li>
+ * <li>{@link SegmentMetadataCache}</li>
+ * <li>{@link SegmentSchemaCache} - Coordinator only</li>
+ * </ul>
+ */
+public class MetadataManagerModule implements Module
+{
+ private Set<NodeRole> nodeRoles;
+ private boolean isSchemaCacheEnabled;
+
+ @Inject
+ public void configure(
+ Properties properties,
+ @Self Set<NodeRole> nodeRoles
+ )
+ {
+ this.nodeRoles = nodeRoles;
+ this.isSchemaCacheEnabled =
MetadataConfigModule.isSegmentSchemaCacheEnabled(properties);
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ // Common dependencies
+ binder.bind(CoordinatorConfigManager.class).in(LazySingleton.class);
+
+ binder.bind(MetadataSupervisorManager.class)
+ .to(SQLMetadataSupervisorManager.class)
+ .in(LazySingleton.class);
+
+ JsonConfigProvider.bind(binder, "druid.manager.segments",
SegmentsMetadataManagerConfig.class);
+ binder.bind(SegmentsMetadataManagerProvider.class)
+ .to(SqlSegmentsMetadataManagerProvider.class)
+ .in(LazySingleton.class);
+ binder.bind(SegmentsMetadataManager.class)
+ .toProvider(SegmentsMetadataManagerProvider.class)
+ .in(ManageLifecycle.class);
+
+ binder.bind(IndexerMetadataStorageCoordinator.class)
+ .to(IndexerSQLMetadataStorageCoordinator.class)
+ .in(ManageLifecycle.class);
+ binder.bind(SegmentMetadataCache.class)
+ .to(HeapMemorySegmentMetadataCache.class)
+ .in(LazySingleton.class);
+
+ // Coordinator-only dependencies
+ if (nodeRoles.contains(NodeRole.COORDINATOR)) {
+ JsonConfigProvider.bind(binder, "druid.manager.rules",
MetadataRuleManagerConfig.class);
+ binder.bind(MetadataRuleManagerProvider.class)
+ .to(SQLMetadataRuleManagerProvider.class)
+ .in(LazySingleton.class);
+ binder.bind(MetadataRuleManager.class)
+ .toProvider(MetadataRuleManagerProvider.class)
+ .in(ManageLifecycle.class);
+
+ binder.bind(MetadataManager.class).in(LazySingleton.class);
+ }
+
+ if (nodeRoles.contains(NodeRole.COORDINATOR) && isSchemaCacheEnabled) {
+ binder.bind(SegmentSchemaCache.class).in(LazySingleton.class);
+ } else {
+ binder.bind(SegmentSchemaCache.class)
+ .to(NoopSegmentSchemaCache.class)
+ .in(LazySingleton.class);
+ }
+
+ // Overlord-only dependencies
+ if (nodeRoles.contains(NodeRole.OVERLORD)) {
+ binder.bind(SegmentMetadataTransactionFactory.class)
+ .to(SqlSegmentMetadataTransactionFactory.class)
+ .in(LazySingleton.class);
+ } else {
+ binder.bind(SegmentMetadataTransactionFactory.class)
+ .to(SqlSegmentMetadataReadOnlyTransactionFactory.class)
+ .in(LazySingleton.class);
+ }
+ }
+}
diff --git
a/services/src/main/java/org/apache/druid/guice/SegmentSchemaCacheModule.java
b/services/src/main/java/org/apache/druid/guice/SegmentSchemaCacheModule.java
index 377950b0370..744d56a89e0 100644
---
a/services/src/main/java/org/apache/druid/guice/SegmentSchemaCacheModule.java
+++
b/services/src/main/java/org/apache/druid/guice/SegmentSchemaCacheModule.java
@@ -44,7 +44,6 @@ import
org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache;
import org.apache.druid.segment.metadata.SegmentMetadataQuerySegmentWalker;
-import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QuerySchedulerProvider;
@@ -83,7 +82,6 @@ public class SegmentSchemaCacheModule implements Module
.toProvider(Key.get(QuerySchedulerProvider.class, Global.class))
.in(LazySingleton.class);
binder.bind(QuerySchedulerProvider.class).in(LazySingleton.class);
- binder.bind(SegmentSchemaCache.class).in(LazySingleton.class);
binder.bind(QuerySegmentWalker.class).to(SegmentMetadataQuerySegmentWalker.class).in(LazySingleton.class);
LifecycleModule.register(binder, CoordinatorSegmentMetadataCache.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]