This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c61c3785a0c Followup changes to 15817 (Segment schema publishing and
polling) (#16368)
c61c3785a0c is described below
commit c61c3785a0c473ed4c83af83f0db6a4ff638cf6d
Author: Rishabh Singh <[email protected]>
AuthorDate: Fri May 3 19:13:52 2024 +0530
Followup changes to 15817 (Segment schema publishing and polling) (#16368)
* Fix build
* Nit changes in KillUnreferencedSegmentSchema
* Replace reference to the abbreviation SMQ with Metadata Query, rename
inTransit maps in schema cache
* nitpicks
* Remove reference to smq abbreviation from integration-tests
* Remove reference to smq abbreviation from integration-tests
* minor change
* Update index.md
* Add delimiter while computing schema fingerprint hash
---
docs/configuration/index.md | 25 ++++-
docs/operations/metrics.md | 26 +++---
.../actions/SegmentTransactionalInsertAction.java | 9 +-
.../common/task/AbstractBatchIndexTask.java | 6 +-
.../druid/indexing/common/TestIndexTask.java | 2 +-
.../indexing/common/task/IngestionTestBase.java | 16 +++-
.../AbstractParallelIndexSupervisorTaskTest.java | 4 +-
.../indexing/overlord/RemoteTaskRunnerTest.java | 6 +-
...se.cds-coordinator-metadata-query-disabled.yml} | 0
integration-tests/docker/druid.sh | 2 +-
...inator-metadata-query-disabled-sample-data.sql} | 0
integration-tests/script/docker_compose_args.sh | 4 +-
.../java/org/apache/druid/tests/TestNGGroup.java | 2 +-
.../tests/indexer/ITAppendBatchIndexTest.java | 2 +-
.../ITAppenderatorDriverRealtimeIndexTaskTest.java | 2 +-
.../ITBestEffortRollupParallelIndexTest.java | 2 +-
.../ITCombiningInputSourceParallelIndexTest.java | 2 +-
.../indexer/ITCompactionSparseColumnTest.java | 2 +-
.../druid/tests/indexer/ITCompactionTaskTest.java | 2 +-
.../druid/tests/indexer/ITHttpInputSourceTest.java | 2 +-
.../apache/druid/tests/indexer/ITIndexerTest.java | 2 +-
...exingServiceNonTransactionalSerializedTest.java | 2 +-
...IndexingServiceTransactionalSerializedTest.java | 2 +-
.../ITLocalInputSourceAllInputFormatTest.java | 2 +-
.../tests/indexer/ITOverwriteBatchIndexTest.java | 2 +-
.../indexer/ITPerfectRollupParallelIndexTest.java | 2 +-
.../tests/indexer/ITRealtimeIndexTaskTest.java | 2 +-
.../indexer/ITSystemTableBatchIndexTaskTest.java | 2 +-
...ithSchema.java => DataSegmentWithMetadata.java} | 4 +-
.../org/apache/druid/segment/SegmentMetadata.java | 4 +-
.../druid/metadata/SQLMetadataConnector.java | 13 +--
.../metadata/SQLMetadataSegmentPublisher.java | 2 +-
.../druid/metadata/SqlSegmentsMetadataManager.java | 2 +-
.../metadata/CoordinatorSegmentMetadataCache.java | 4 +-
.../segment/metadata/FingerprintGenerator.java | 7 ++
.../metadata/SegmentSchemaBackFillQueue.java | 4 +-
.../druid/segment/metadata/SegmentSchemaCache.java | 104 ++++++++++++---------
.../realtime/appenderator/AppenderatorImpl.java | 16 ++--
.../realtime/appenderator/BatchAppenderator.java | 16 ++--
.../realtime/appenderator/StreamAppenderator.java | 16 ++--
.../druid/server/coordinator/DruidCoordinator.java | 4 +-
...uty.java => KillUnreferencedSegmentSchema.java} | 12 +--
.../SqlSegmentsMetadataManagerTestBase.java | 31 ------
.../CoordinatorSegmentMetadataCacheTest.java | 2 +-
.../segment/metadata/FingerprintGeneratorTest.java | 2 +-
.../segment/metadata/SegmentSchemaCacheTest.java | 6 +-
...java => KillUnreferencedSegmentSchemaTest.java} | 18 ++--
47 files changed, 213 insertions(+), 186 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 6b898792252..91f4db60e69 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -279,7 +279,7 @@ The `file` request logger stores daily request logs on disk.
|--------|-----------|-------|
|`druid.request.logging.dir`|Historical, Realtime, and Broker services
maintain request logs of all of the requests they get (interaction is via POST,
so normal request logs don’t generally capture information about the actual
query), this specifies the directory to store the request logs in|none|
|`druid.request.logging.filePattern`|[Joda datetime
format](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html)
for each file|"yyyy-MM-dd'.log'"|
-| `druid.request.logging.durationToRetain`| Period to retain the request logs
on disk. The period should be at least longer than `P1D`.| none
+| `druid.request.logging.durationToRetain`| Period to retain the request logs
on disk. The period should be at least longer than `P1D`.| none|
The format of request logs is TSV, one line per requests, with five fields:
timestamp, remote\_addr, native\_query, query\_context, sql\_query.
@@ -581,6 +581,23 @@ This deep storage is used to interface with Cassandra. You
must load the `druid-
|`druid.storage.keyspace`|Cassandra key space.|none|
+#### Centralized datasource schema
+
+Centralized datasource schema is an [experimental
feature](../development/experimental.md) to centralized datasource schema
building within the Coordinator.
+Traditionally, the datasource schema is built in the Brokers by combining
schema for all the available segments of a datasource.
+Brokers issue segment metadata query to data nodes and tasks to fetch segment
schema.
+In the new arrangement, tasks publish segment schema along with segment
metadata to the database and schema for realtime segments is periodically
pushed to the Coordinator in the segment announcement flow.
+This enables Coordinator to cache segment schemas and build datasource schema
by combining segment schema.
+Brokers query the datasource schema from the Coordinator, while retaining the
ability to build table schema if the
+need arises.
+
+|Property|Description|Default|Required|
+|-----|-----------|-------|--------|
+|`druid.centralizedDatasourceSchema.enabled`|Boolean flag for enabling
datasource schema building in the Coordinator, this should be specified in the
common runtime properties.|false|No.|
+|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This
config should be set when CentralizedDatasourceSchema feature is enabled. This
should be specified in the MiddleManager runtime properties.|false|No.|
+
+For, stale schema cleanup configs, refer to properties with the prefix
`druid.coordinator.kill.segmentSchema` in [Metadata
Management](#metadata-management).
+
### Ingestion security configuration
#### HDFS input source
@@ -878,7 +895,6 @@ These Coordinator static configurations can be defined in
the `coordinator/runti
|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop
requests to batch in one HTTP request. Note that it must be smaller than
`druid.segmentCache.numLoadingThreads` config on Historical service.|1|
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this
Coordinator service should act like an Overlord as well. This configuration
allows users to simplify a Druid cluster by not having to deploy any standalone
Overlord services. If set to true, then Overlord console is available at
`http://coordinator-host:port/console.html` and be sure to set
`druid.coordinator.asOverlord.overlordService` also.|false|
|`druid.coordinator.asOverlord.overlordService`| Required, if
`druid.coordinator.asOverlord.enabled` is `true`. This must be same value as
`druid.service` on standalone Overlord services and
`druid.selectors.indexing.serviceName` on Middle Managers.|NULL|
-|`druid.centralizedDatasourceSchema.enabled`|Boolean flag for enabling
datasource schema building on the Coordinator. Note, when using MiddleManager
to launch task, set
`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled` in
MiddleManager runtime config. |false|
##### Metadata management
@@ -899,6 +915,9 @@ These Coordinator static configurations can be defined in
the `coordinator/runti
|`druid.coordinator.kill.datasource.on`| Boolean value for whether to enable
automatic deletion of datasource metadata (Note: datasource metadata only
exists for datasource created from supervisor). If set to true, Coordinator
will periodically remove datasource metadata of terminated supervisor from the
datasource table in metadata storage. | No | True|
|`druid.coordinator.kill.datasource.period`| How often to do automatic
deletion of datasource metadata in [ISO
8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be
equal to or greater than
`druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if
`druid.coordinator.kill.datasource.on` is set to true.| No| `P1D`|
|`druid.coordinator.kill.datasource.durationToRetain`| Duration of datasource
metadata to be retained from created time in [ISO
8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if
`druid.coordinator.kill.datasource.on` is set to true.| Yes if
`druid.coordinator.kill.datasource.on` is set to true.| `P90D`|
+|`druid.coordinator.kill.segmentSchema.on`| Boolean value for whether to
enable automatic deletion of unused segment schemas. If set to true,
Coordinator will periodically identify segment schemas which are not referenced
by any used segment and mark them as unused. At a later point, these unused
schemas are deleted. Only applies if [Centralized Datasource
schema](#centralized-datasource-schema) feature is enabled. | No | True|
+|`druid.coordinator.kill.segmentSchema.period`| How often to do automatic
deletion of segment schemas in [ISO
8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be
equal to or greater than
`druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if
`druid.coordinator.kill.segmentSchema.on` is set to true.| No| `P1D`|
+|`druid.coordinator.kill.segmentSchema.durationToRetain`| Duration of segment
schemas to be retained from the time it was marked as unused in [ISO
8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if
`druid.coordinator.kill.segmentSchema.on` is set to true.| Yes, if
`druid.coordinator.kill.segmentSchema.on` is set to true.| `P90D`|
##### Segment management
@@ -1428,7 +1447,7 @@ MiddleManagers pass their configurations down to their
child peons. The MiddleMa
|`druid.worker.baseTaskDirs`|List of base temporary working directories, one
of which is assigned per task in a round-robin fashion. This property can be
used to allow usage of multiple disks for indexing. This property is
recommended in place of and takes precedence over
`${druid.indexer.task.baseTaskDir}`. If this configuration is not set,
`${druid.indexer.task.baseTaskDir}` is used. For example,
`druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by
tasks on any single task dir. This value is treated symmetrically across all
directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then
each of those task directories is assumed to allow for 500 GB to be used and a
total of 1.5 TB will potentially be available across all tasks. The actual
amount of memory assigned to each task is discussed in [Configuring task
storage sizes](../ingestion/tasks [...]
|`druid.worker.category`|A string to name the category that the MiddleManager
node belongs to.|`_default_worker_category`|
-|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This
config should be set when CentralizedDatasourceSchema feature is enabled.
|false|
+|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This
config should be set when [Centralized Datasource
Schema](#centralized-datasource-schema) feature is enabled. |false|
#### Peon processing
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index a877d8b8522..51041952cfa 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -70,17 +70,11 @@ Most metric values reset each emission period, as specified
in `druid.monitoring
|`sqlQuery/bytes`|Number of bytes returned in the SQL query response.|`id`,
`nativeQueryIds`, `dataSource`, `remoteAddress`, `success`, `engine`| |
|`serverview/init/time`|Time taken to initialize the broker server view.
Useful to detect if brokers are taking too long to start.||Depends on the
number of segments.|
|`metadatacache/init/time`|Time taken to initialize the broker segment
metadata cache. Useful to detect if brokers are taking too long to
start||Depends on the number of segments.|
-|`metadatacache/refresh/count`|Number of segments to refresh in broker segment
metadata cache.|`dataSource`|
-|`metadatacache/refresh/time`|Time taken to refresh segments in broker segment
metadata cache.|`dataSource`|
-|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch
datasource schema.||
-|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch
datasource schema.||
-|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch
datasource schema.||
-|`metadatacache/backfill/count`|Number of segments for which schema was back
filled in the database.|`dataSource`|
-|`schemacache/realtime/count`|Number of realtime segments for which schema is
cached.||Depends on the number of realtime segments.|
-|`schemacache/finalizedSegmentMetadata/count`|Number of finalized segments for
which schema metadata is cached.||Depends on the number of segments in the
cluster.|
-|`schemacache/finalizedSchemaPayload/count`|Number of finalized segment schema
cached.||Depends on the number of distinct schema in the cluster.|
-|`schemacache/inTransitSMQResults/count`|Number of segments for which schema
was fetched by executing segment metadata query.||Eventually it should be 0.|
-|`schemacache/inTransitSMQPublishedResults/count`|Number of segments for which
schema is cached after back filling in the database.||Eventually it should be
0.|
+|`metadatacache/refresh/count`|Number of segments to refresh in broker segment
metadata cache.|`dataSource`||
+|`metadatacache/refresh/time`|Time taken to refresh segments in broker segment
metadata cache.|`dataSource`||
+|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch
datasource schema.|||
+|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch
datasource schema.|||
+|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch
datasource schema.|||
|`serverview/sync/healthy`|Sync status of the Broker with a segment-loading
server such as a Historical or Peon. Emitted only when [HTTP-based server
view](../configuration/index.md#segment-management) is enabled. This metric can
be used in conjunction with `serverview/sync/unstableTime` to debug slow
startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
|`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has
been failing to sync with a segment-loading server. Emitted only when
[HTTP-based server view](../configuration/index.md#segment-management) is
enabled.|`server`, `tier`|Not emitted for synced servers.|
|`subquery/rowLimit/count`|Number of subqueries whose results are materialized
as rows (Java objects on heap).|This metric is only available if the
`SubqueryCountStatsMonitor` module is included.| |
@@ -375,8 +369,14 @@ These metrics are for the Druid Coordinator and are reset
each time the Coordina
|`serverview/sync/healthy`|Sync status of the Coordinator with a
segment-loading server such as a Historical or Peon. Emitted only when
[HTTP-based server view](../configuration/index.md#segment-management) is
enabled. You can use this metric in conjunction with
`serverview/sync/unstableTime` to debug slow startup of the
Coordinator.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
|`serverview/sync/unstableTime`|Time in milliseconds for which the Coordinator
has been failing to sync with a segment-loading server. Emitted only when
[HTTP-based server view](../configuration/index.md#segment-management) is
enabled.|`server`, `tier`|Not emitted for synced servers.|
|`metadatacache/init/time`|Time taken to initialize the coordinator segment
metadata cache.||Depends on the number of segments.|
-|`metadatacache/refresh/count`|Number of segments to refresh in coordinator
segment metadata cache.|`dataSource`|
-|`metadatacache/refresh/time`|Time taken to refresh segments in coordinator
segment metadata cache.|`dataSource`|
+|`metadatacache/refresh/count`|Number of segments to refresh in coordinator
segment metadata cache.|`dataSource`||
+|`metadatacache/refresh/time`|Time taken to refresh segments in coordinator
segment metadata cache.|`dataSource`||
+|`metadatacache/backfill/count`|Number of segments for which schema was back
filled in the database.|`dataSource`||
+|`metadatacache/realtimeSegmentSchema/count`|Number of realtime segments for
which schema is cached.||Depends on the number of realtime segments in the
cluster.|
+|`metadatacache/finalizedSegmentMetadata/count`|Number of finalized segments
for which schema metadata is cached.||Depends on the number of segments in the
cluster.|
+|`metadatacache/finalizedSchemaPayload/count`|Number of finalized segment
schema cached.||Depends on the number of distinct schema in the cluster.|
+|`metadatacache/temporaryMetadataQueryResults/count`|Number of segments for
which schema was fetched by executing segment metadata query.||Eventually it
should be 0.|
+|`metadatacache/temporaryPublishedMetadataQueryResults/count`|Number of
segments for which schema is cached after back filling in the database.||This
value gets reset after each database poll. Eventually it should be 0.|
## General Health
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index 4bcc8c5d39f..c2f542b096e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -78,8 +78,13 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
@Nullable SegmentSchemaMapping segmentSchemaMapping
)
{
- return new SegmentTransactionalInsertAction(segmentsToBeOverwritten,
segmentsToPublish, null, null, null,
- segmentSchemaMapping
+ return new SegmentTransactionalInsertAction(
+ segmentsToBeOverwritten,
+ segmentsToPublish,
+ null,
+ null,
+ null,
+ segmentSchemaMapping
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 9fe3b78ee2d..5a17c4379f1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -443,8 +443,10 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
case APPEND:
return SegmentTransactionalAppendAction.forSegments(segmentsToPublish,
segmentSchemaMapping);
default:
- return
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten,
segmentsToPublish,
-
segmentSchemaMapping
+ return SegmentTransactionalInsertAction.overwriteAction(
+ segmentsToBeOverwritten,
+ segmentsToPublish,
+ segmentSchemaMapping
);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java
index e94ced42193..b4166b7bc29 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java
@@ -116,7 +116,7 @@ public class TestIndexTask extends IndexTask
return status;
}
- public TaskAction<SegmentPublishResult> testBuildPublishAction(
+ public TaskAction<SegmentPublishResult> buildPublishActionForTest(
Set<DataSegment> segmentsToBeOverwritten,
Set<DataSegment> segmentsToPublish,
SegmentSchemaMapping segmentSchemaMapping,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 6b093cd745f..7caad45bc33 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -548,6 +548,9 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
}
}
+ /**
+ * Verify that schema is present for each segment.
+ */
public void verifySchema(DataSegmentsWithSchemas dataSegmentsWithSchemas)
{
int nonTombstoneSegments = 0;
@@ -556,11 +559,16 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
continue;
}
nonTombstoneSegments++;
- Assert.assertTrue(dataSegmentsWithSchemas.getSegmentSchemaMapping()
- .getSegmentIdToMetadataMap()
-
.containsKey(segment.getId().toString()));
+ Assert.assertTrue(
+ dataSegmentsWithSchemas.getSegmentSchemaMapping()
+ .getSegmentIdToMetadataMap()
+ .containsKey(segment.getId().toString())
+ );
}
- Assert.assertEquals(nonTombstoneSegments,
dataSegmentsWithSchemas.getSegmentSchemaMapping().getSegmentIdToMetadataMap().size());
+ Assert.assertEquals(
+ nonTombstoneSegments,
+
dataSegmentsWithSchemas.getSegmentSchemaMapping().getSegmentIdToMetadataMap().size()
+ );
}
public TaskReport.ReportMap getReports() throws IOException
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 29ed44f0ad0..f888dd76bf0 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -534,7 +534,7 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
}
}
- public DataSegmentsWithSchemas getPublishedSegments(String taskId)
+ public DataSegmentsWithSchemas getPublishedSegmentsWithSchemas(String
taskId)
{
final TaskContainer taskContainer = tasks.get(taskId);
if (taskContainer == null || taskContainer.actionClient == null) {
@@ -667,7 +667,7 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
public DataSegmentsWithSchemas getSegmentAndSchemas(Task task)
{
- return taskRunner.getPublishedSegments(task.getId());
+ return taskRunner.getPublishedSegmentsWithSchemas(task.getId());
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index f4cb82dcd71..dec98e05291 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -1164,7 +1164,7 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(
SegmentTransactionalAppendAction.class,
- task.testBuildPublishAction(
+ task.buildPublishActionForTest(
Collections.emptySet(),
Collections.emptySet(),
null,
@@ -1174,7 +1174,7 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(
SegmentTransactionalReplaceAction.class,
- task.testBuildPublishAction(
+ task.buildPublishActionForTest(
Collections.emptySet(),
Collections.emptySet(),
null,
@@ -1184,7 +1184,7 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(
SegmentTransactionalInsertAction.class,
- task.testBuildPublishAction(
+ task.buildPublishActionForTest(
Collections.emptySet(),
Collections.emptySet(),
null,
diff --git
a/integration-tests/docker/docker-compose.cds-coordinator-smq-disabled.yml
b/integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml
similarity index 100%
rename from
integration-tests/docker/docker-compose.cds-coordinator-smq-disabled.yml
rename to
integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml
diff --git a/integration-tests/docker/druid.sh
b/integration-tests/docker/druid.sh
index f112f91d159..d7bd80ca32e 100755
--- a/integration-tests/docker/druid.sh
+++ b/integration-tests/docker/druid.sh
@@ -85,7 +85,7 @@ setupData()
# The "query" and "security" test groups require data to be setup before
running the tests.
# In particular, they requires segments to be download from a pre-existing
s3 bucket.
# This is done by using the loadSpec put into metadatastore and s3
credientials set below.
- if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "upgrade" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "centralized-datasource-schema" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" [...]
+ if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "upgrade" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "centralized-datasource-schema" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" [...]
# touch is needed because OverlayFS's copy-up operation breaks POSIX
standards. See https://github.com/docker/for-linux/issues/72.
find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
&& cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql |
mysql -u root druid \
diff --git
a/integration-tests/docker/test-data/cds-coordinator-smq-disabled-sample-data.sql
b/integration-tests/docker/test-data/cds-coordinator-metadata-query-disabled-sample-data.sql
similarity index 100%
rename from
integration-tests/docker/test-data/cds-coordinator-smq-disabled-sample-data.sql
rename to
integration-tests/docker/test-data/cds-coordinator-metadata-query-disabled-sample-data.sql
diff --git a/integration-tests/script/docker_compose_args.sh
b/integration-tests/script/docker_compose_args.sh
index c37d22ca314..477627d300b 100644
--- a/integration-tests/script/docker_compose_args.sh
+++ b/integration-tests/script/docker_compose_args.sh
@@ -79,10 +79,10 @@ getComposeArgs()
then
# cluster with overriden properties for broker and coordinator
echo "-f
${DOCKERDIR}/docker-compose.cds-task-schema-publish-disabled.yml"
- elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-coordinator-smq-disabled" ]
+ elif [ "$DRUID_INTEGRATION_TEST_GROUP" =
"cds-coordinator-metadata-query-disabled" ]
then
# cluster with overriden properties for broker and coordinator
- echo "-f ${DOCKERDIR}/docker-compose.cds-coordinator-smq-disabled.yml"
+ echo "-f
${DOCKERDIR}/docker-compose.cds-coordinator-metadata-query-disabled.yml"
else
# default
echo "-f ${DOCKERDIR}/docker-compose.yml"
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
index 516dcb65434..b3417902cef 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
@@ -166,5 +166,5 @@ public class TestNGGroup
public static final String CDS_TASK_SCHEMA_PUBLISH_DISABLED =
"cds-task-schema-publish-disabled";
- public static final String CDS_COORDINATOR_SMQ_DISABLED =
"cds-coordinator-smq-disabled";
+ public static final String CDS_COORDINATOR_METADATA_QUERY_DISABLED =
"cds-coordinator-metadata-query-disabled";
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java
index 84ddb7612e6..ced35949626 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java
@@ -40,7 +40,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
-@Test(groups = {TestNGGroup.APPEND_INGESTION,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.APPEND_INGESTION,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITAppendBatchIndexTest extends AbstractITBatchIndexTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java
index a077dbd0a97..b69ac027675 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java
@@ -44,7 +44,7 @@ import java.util.Map;
/**
* See {@link AbstractITRealtimeIndexTaskTest} for test details.
*/
-@Test(groups = {TestNGGroup.REALTIME_INDEX,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.REALTIME_INDEX,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITAppenderatorDriverRealtimeIndexTaskTest extends
AbstractITRealtimeIndexTaskTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java
index 0ecfe7ed5c4..be2ab61a5f5 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java
@@ -38,7 +38,7 @@ import org.testng.annotations.Test;
import java.io.Closeable;
import java.util.function.Function;
-@Test(groups = {TestNGGroup.BATCH_INDEX,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.BATCH_INDEX,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITBestEffortRollupParallelIndexTest extends
AbstractITBatchIndexTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java
index 40549a2685e..2af07e017b9 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java
@@ -32,7 +32,7 @@ import java.io.Closeable;
import java.util.Map;
import java.util.function.Function;
-@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITCombiningInputSourceParallelIndexTest extends
AbstractITBatchIndexTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java
index 27b771308b3..58dc7c43ae7 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java
@@ -41,7 +41,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
-@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITCompactionSparseColumnTest extends AbstractIndexerTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index 6dbcb90c3df..b974c7d20e9 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -51,7 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITCompactionTaskTest extends AbstractIndexerTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITHttpInputSourceTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITHttpInputSourceTest.java
index 11404bdd56e..25eb83a7c28 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITHttpInputSourceTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITHttpInputSourceTest.java
@@ -29,7 +29,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.UUID;
-@Test(groups = {TestNGGroup.INPUT_SOURCE,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.INPUT_SOURCE,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITHttpInputSourceTest extends AbstractITBatchIndexTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index 77c64733a62..f527135c80d 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -41,7 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.function.Function;
-@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITIndexerTest extends AbstractITBatchIndexTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
index 33bf5a5d79b..5df5a708bc5 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
@@ -25,7 +25,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
-@Test(groups = {TestNGGroup.KAFKA_INDEX_SLOW,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.KAFKA_INDEX_SLOW,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends
AbstractKafkaIndexingServiceTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
index a50aa6ce10e..424d3c67068 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
@@ -25,7 +25,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
-@Test(groups = {TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITKafkaIndexingServiceTransactionalSerializedTest extends
AbstractKafkaIndexingServiceTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITLocalInputSourceAllInputFormatTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITLocalInputSourceAllInputFormatTest.java
index 0cc47b9bc63..5e7678e8b6b 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITLocalInputSourceAllInputFormatTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITLocalInputSourceAllInputFormatTest.java
@@ -30,7 +30,7 @@ import org.testng.annotations.Test;
import java.util.List;
import java.util.Map;
-@Test(groups = {TestNGGroup.INPUT_FORMAT,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.INPUT_FORMAT,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITLocalInputSourceAllInputFormatTest extends
AbstractLocalInputSourceParallelIndexTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOverwriteBatchIndexTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOverwriteBatchIndexTest.java
index e81cf74b457..c8d54e6beac 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOverwriteBatchIndexTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOverwriteBatchIndexTest.java
@@ -33,7 +33,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
-@Test(groups = {TestNGGroup.BATCH_INDEX,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.BATCH_INDEX,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITOverwriteBatchIndexTest extends AbstractITBatchIndexTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java
index ddae46b18dd..0e8fc904949 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java
@@ -36,7 +36,7 @@ import org.testng.annotations.Test;
import java.io.Closeable;
import java.util.function.Function;
-@Test(groups = {TestNGGroup.PERFECT_ROLLUP_PARALLEL_BATCH_INDEX,
TestNGGroup.SHUFFLE_DEEP_STORE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.PERFECT_ROLLUP_PARALLEL_BATCH_INDEX,
TestNGGroup.SHUFFLE_DEEP_STORE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITPerfectRollupParallelIndexTest extends AbstractITBatchIndexTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITRealtimeIndexTaskTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITRealtimeIndexTaskTest.java
index 6dc2988c3e0..be3f518b098 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITRealtimeIndexTaskTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITRealtimeIndexTaskTest.java
@@ -44,7 +44,7 @@ import java.util.Map;
/**
* See {@link AbstractITRealtimeIndexTaskTest} for test details.
*/
-@Test(groups = {TestNGGroup.REALTIME_INDEX,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.REALTIME_INDEX,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITRealtimeIndexTaskTest extends AbstractITRealtimeIndexTaskTest
{
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java
index e5f60d87c48..f15081509a0 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java
@@ -30,7 +30,7 @@ import org.testng.annotations.Test;
import java.io.Closeable;
import java.util.function.Function;
-@Test(groups = {TestNGGroup.BATCH_INDEX,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED})
+@Test(groups = {TestNGGroup.BATCH_INDEX,
TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED,
TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITSystemTableBatchIndexTaskTest extends AbstractITBatchIndexTest
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/DataSegmentWithSchema.java
b/processing/src/main/java/org/apache/druid/segment/DataSegmentWithMetadata.java
similarity index 95%
rename from
processing/src/main/java/org/apache/druid/segment/DataSegmentWithSchema.java
rename to
processing/src/main/java/org/apache/druid/segment/DataSegmentWithMetadata.java
index b82b4d26653..e2c1729ed38 100644
---
a/processing/src/main/java/org/apache/druid/segment/DataSegmentWithSchema.java
+++
b/processing/src/main/java/org/apache/druid/segment/DataSegmentWithMetadata.java
@@ -26,7 +26,7 @@ import javax.annotation.Nullable;
/**
* Immutable wrapper class for segment and schema.
*/
-public class DataSegmentWithSchema
+public class DataSegmentWithMetadata
{
@Nullable
private final DataSegment dataSegment;
@@ -34,7 +34,7 @@ public class DataSegmentWithSchema
@Nullable
private final SchemaPayloadPlus schemaPayloadPlus;
- public DataSegmentWithSchema(
+ public DataSegmentWithMetadata(
@Nullable DataSegment dataSegment,
@Nullable SchemaPayloadPlus schemaPayloadPlus
)
diff --git
a/processing/src/main/java/org/apache/druid/segment/SegmentMetadata.java
b/processing/src/main/java/org/apache/druid/segment/SegmentMetadata.java
index f12a676907e..d367a926ed4 100644
--- a/processing/src/main/java/org/apache/druid/segment/SegmentMetadata.java
+++ b/processing/src/main/java/org/apache/druid/segment/SegmentMetadata.java
@@ -79,9 +79,9 @@ public class SegmentMetadata
@Override
public String toString()
{
- return "SegmentStats{" +
+ return "SegmentMetadata{" +
"numRows=" + numRows +
- ", fingerprint='" + schemaFingerprint + '\'' +
+ ", schemaFingerprint='" + schemaFingerprint + '\'' +
'}';
}
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
index cb85548d5ba..cd927b2fef8 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
@@ -339,10 +339,10 @@ public abstract class SQLMetadataConnector implements
MetadataStorageConnector
for (String column : columns) {
createStatementBuilder.append(column);
- createStatementBuilder.append(",");
+ createStatementBuilder.append(",\n");
}
- createStatementBuilder.append("PRIMARY KEY (id))");
+ createStatementBuilder.append("PRIMARY KEY (id)\n)");
createTable(
tableName,
@@ -618,10 +618,7 @@ public abstract class SQLMetadataConnector implements
MetadataStorageConnector
log.info("Adding columns %s to table[%s].", columnsToAdd, tableName);
}
- alterTable(
- tableName,
- alterCommands
- );
+ alterTable(tableName, alterCommands);
}
@Override
@@ -1011,7 +1008,7 @@ public abstract class SQLMetadataConnector implements
MetadataStorageConnector
tableName, getSerialType(), getPayloadType()
),
StringUtils.format("CREATE INDEX idx_%1$s_fingerprint ON
%1$s(fingerprint)", tableName),
- StringUtils.format("CREATE INDEX idx_%1$s_used ON %1$s(used)",
tableName)
+ StringUtils.format("CREATE INDEX idx_%1$s_used ON %1$s(used,
used_status_last_updated)", tableName)
)
);
}
@@ -1155,7 +1152,7 @@ public abstract class SQLMetadataConnector implements
MetadataStorageConnector
{
String segmentsTables = tablesConfigSupplier.get().getSegmentsTable();
- boolean schemaPersistenceRequirementMet =
+ final boolean schemaPersistenceRequirementMet =
!centralizedDatasourceSchemaConfig.isEnabled() ||
(tableHasColumn(segmentsTables, "schema_fingerprint")
&& tableHasColumn(segmentsTables, "num_rows"));
diff --git
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java
index 9416f8e53fa..48a92ecba4e 100644
---
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java
+++
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java
@@ -76,7 +76,7 @@ public class SQLMetadataSegmentPublisher implements
MetadataSegmentPublisher
);
}
- void publishSegment(
+ private void publishSegment(
final String segmentId,
final String dataSource,
final String createdDate,
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 7e45a45464c..590a61d78d0 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -1177,7 +1177,7 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
})
.list();
- segmentSchemaCache.resetInTransitSMQResultPublishedOnDBPoll();
+
segmentSchemaCache.resetTemporaryPublishedMetadataQueryResultOnDBPoll();
return null;
});
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
index 20417414677..dad0b78ea77 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
@@ -68,7 +68,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* The schema is merged with any existing schema for the segment and the cache
is updated.
* Corresponding datasource is marked for refresh.</li>
* <li>The refresh mechanism is significantly different from the other
implementation,
- * <ul><li>SMQ is executed only for those non-realtime segments for which the
schema is not cached.</li>
+ * <ul><li>Metadata query is executed only for those non-realtime segments for
which the schema is not cached.</li>
* <li>Datasources marked for refresh are then rebuilt.</li></ul>
* </li>
*/
@@ -265,7 +265,7 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
log.debug("Publishing segment schema. SegmentId [%s],
RowSignature [%s], numRows [%d]", segmentId, rowSignature, numRows);
Map<String, AggregatorFactory> aggregators =
analysis.getAggregators();
// cache the signature
- segmentSchemaCache.addInTransitSMQResult(segmentId,
rowSignature, aggregators, numRows);
+
segmentSchemaCache.addTemporaryMetadataQueryResult(segmentId, rowSignature,
aggregators, numRows);
// queue the schema for publishing to the DB
segmentSchemaBackfillQueue.add(segmentId, rowSignature,
aggregators, numRows);
added.set(true);
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java
b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java
index c13a305da18..ffaccd09bdf 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java
@@ -58,8 +58,15 @@ public class FingerprintGenerator
final Hasher hasher = Hashing.sha256().newHasher();
hasher.putBytes(objectMapper.writeValueAsBytes(schemaPayload));
+ // add delimiter, inspired from
org.apache.druid.metadata.PendingSegmentRecord.computeSequenceNamePrevIdSha1
+ hasher.putByte((byte) 0xff);
+
hasher.putBytes(StringUtils.toUtf8(dataSource));
+ hasher.putByte((byte) 0xff);
+
hasher.putBytes(Ints.toByteArray(version));
+ hasher.putByte((byte) 0xff);
+
return BaseEncoding.base16().encode(hasher.hash().asBytes());
}
catch (IOException e) {
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
index bb787c83c8e..c2995e3087e 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
@@ -178,12 +178,12 @@ public class SegmentSchemaBackFillQueue
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(entry.getKey(),
entry.getValue(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
// Mark the segments as published in the cache.
for (SegmentSchemaMetadataPlus plus : entry.getValue()) {
-
segmentSchemaCache.markInTransitSMQResultPublished(plus.getSegmentId());
+
segmentSchemaCache.markInMetadataQueryResultPublished(plus.getSegmentId());
}
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource",
entry.getKey())
-
.setMetric("metadatacache/backfill/count", polled.size())
+
.setMetric("metadatacache/backfill/count", entry.getValue().size())
);
}
catch (Exception e) {
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
index 2ba8aee29cb..e2fb1681792 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
@@ -49,10 +49,10 @@ import java.util.concurrent.atomic.AtomicReference;
* Additionally, this class caches schema for realtime segments in {@link
SegmentSchemaCache#realtimeSegmentSchema}. This mapping
* is cleared either when the segment is removed or marked as finalized.
* <p>
- * Finalized segments which do not have their schema information present in
the DB, fetch their schema via SMQ.
- * SMQ results are cached in {@link SegmentSchemaCache#inTransitSMQResults}.
Once the schema information is backfilled
- * in the DB, it is removed from {@link
SegmentSchemaCache#inTransitSMQResults} and added to {@link
SegmentSchemaCache#inTransitSMQPublishedResults}.
- * {@link SegmentSchemaCache#inTransitSMQPublishedResults} is cleared on each
successfull DB poll.
+ * Finalized segments which do not have their schema information present in
the DB, fetch their schema via metadata query.
+ * Metadata query results are cached in {@link
SegmentSchemaCache#temporaryMetadataQueryResults}. Once the schema information
is backfilled
+ * in the DB, it is removed from {@link
SegmentSchemaCache#temporaryMetadataQueryResults} and added to {@link
SegmentSchemaCache#temporaryPublishedMetadataQueryResults}.
+ * {@link SegmentSchemaCache#temporaryPublishedMetadataQueryResults} is
cleared on each successfull DB poll.
* <p>
* {@link CoordinatorSegmentMetadataCache} uses this cache to fetch schema for
a segment.
* <p>
@@ -81,19 +81,19 @@ public class SegmentSchemaCache
private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
realtimeSegmentSchema = new ConcurrentHashMap<>();
/**
- * If the segment schema is fetched via SMQ, subsequently it is added here.
+ * If the segment schema is fetched via segment metadata query, subsequently
it is added here.
* The mapping is removed when the schema information is backfilled in the
DB.
*/
- private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
inTransitSMQResults = new ConcurrentHashMap<>();
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
temporaryMetadataQueryResults = new ConcurrentHashMap<>();
/**
* Once the schema information is backfilled in the DB, it is added here.
* This map is cleared after each DB poll.
* After the DB poll and before clearing this map it is possible that some
results were added to this map.
* These results would get lost after clearing this map.
- * But, it should be fine since the schema could be retrieved if needed
using SMQ, also the schema would be available in the next poll.
+ * But, it should be fine since the schema could be retrieved if needed
using metadata query, also the schema would be available in the next poll.
*/
- private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
inTransitSMQPublishedResults = new ConcurrentHashMap<>();
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
temporaryPublishedMetadataQueryResults = new ConcurrentHashMap<>();
private final ServiceEmitter emitter;
@@ -121,8 +121,8 @@ public class SegmentSchemaCache
initialized.set(new CountDownLatch(1));
finalizedSegmentSchemaInfo = new
FinalizedSegmentSchemaInfo(ImmutableMap.of(), ImmutableMap.of());
- inTransitSMQResults.clear();
- inTransitSMQPublishedResults.clear();
+ temporaryMetadataQueryResults.clear();
+ temporaryPublishedMetadataQueryResults.clear();
}
public boolean isInitialized()
@@ -132,7 +132,7 @@ public class SegmentSchemaCache
/**
* {@link CoordinatorSegmentMetadataCache} startup waits on the cache
initialization.
- * This is being done to ensure that we don't execute SMQ for segment with
schema already present in the DB.
+ * This is being done to ensure that we don't execute metadata query for
segment with schema already present in the DB.
*/
public void awaitInitialization() throws InterruptedException
{
@@ -157,44 +157,44 @@ public class SegmentSchemaCache
}
/**
- * Cache SMQ result. This entry is cleared when SMQ result is published to
the DB.
+ * Cache metadata query result. This entry is cleared when metadata query
result is published to the DB.
*/
- public void addInTransitSMQResult(
+ public void addTemporaryMetadataQueryResult(
SegmentId segmentId,
RowSignature rowSignature,
Map<String, AggregatorFactory> aggregatorFactories,
long numRows
)
{
- inTransitSMQResults.put(segmentId, new SchemaPayloadPlus(new
SchemaPayload(rowSignature, aggregatorFactories), numRows));
+ temporaryMetadataQueryResults.put(segmentId, new SchemaPayloadPlus(new
SchemaPayload(rowSignature, aggregatorFactories), numRows));
}
/**
- * After, SMQ result is published to the DB, it is removed from the {@code
inTransitSMQResults}
- * and added to {@code inTransitSMQPublishedResults}.
+ * After, metadata query result is published to the DB, it is removed from
temporaryMetadataQueryResults
+ * and added to temporaryPublishedMetadataQueryResults.
*/
- public void markInTransitSMQResultPublished(SegmentId segmentId)
+ public void markInMetadataQueryResultPublished(SegmentId segmentId)
{
- if (!inTransitSMQResults.containsKey(segmentId)) {
- log.error("SegmentId [%s] not found in InTransitSMQResultPublished
map.", segmentId);
+ if (!temporaryMetadataQueryResults.containsKey(segmentId)) {
+ log.error("SegmentId [%s] not found in temporaryMetadataQueryResults
map.", segmentId);
}
- inTransitSMQPublishedResults.put(segmentId,
inTransitSMQResults.get(segmentId));
- inTransitSMQResults.remove(segmentId);
+ temporaryPublishedMetadataQueryResults.put(segmentId,
temporaryMetadataQueryResults.get(segmentId));
+ temporaryMetadataQueryResults.remove(segmentId);
}
/**
- * {@code inTransitSMQPublishedResults} is reset on each DB poll.
+ * temporaryPublishedMetadataQueryResults is reset after each DB poll.
*/
- public void resetInTransitSMQResultPublishedOnDBPoll()
+ public void resetTemporaryPublishedMetadataQueryResultOnDBPoll()
{
- inTransitSMQPublishedResults.clear();
+ temporaryPublishedMetadataQueryResults.clear();
}
/**
- * Fetch schema for a given segment. Note, since schema corresponding to the
current schema version in
- * {@link CentralizedDatasourceSchemaConfig#SCHEMA_VERSION} is cached, there
is no check on version here.
- * Any change in version would require a service restart, so we will never
end up with multi version schema.
+ * Fetch schema for a given segment. Note, that there is no check on schema
version in this method,
+ * since schema corresponding to a particular version {@link
CentralizedDatasourceSchemaConfig#SCHEMA_VERSION} is cached.
+ * Any change in version would require a service restart, so this cache will
never have schema for multiple versions.
*/
public Optional<SchemaPayloadPlus> getSchemaForSegment(SegmentId segmentId)
{
@@ -208,18 +208,18 @@ public class SegmentSchemaCache
return Optional.of(payloadPlus);
}
- // it is important to lookup {@code inTransitSMQResults} before {@code
inTransitSMQPublishedResults}
+ // it is important to lookup temporaryMetadataQueryResults before
temporaryPublishedMetadataQueryResults
// other way round, if a segment schema is just published it is possible
that the schema is missing
- // in {@code inTransitSMQPublishedResults} and by the time we check {@code
inTransitSMQResults} it is removed.
+ // in temporaryPublishedMetadataQueryResults and by the time we check
temporaryMetadataQueryResults it is removed.
- // segment schema has been fetched via SMQ
- payloadPlus = inTransitSMQResults.get(segmentId);
+ // segment schema has been fetched via metadata query
+ payloadPlus = temporaryMetadataQueryResults.get(segmentId);
if (payloadPlus != null) {
return Optional.of(payloadPlus);
}
- // segment schema has been fetched via SMQ and the schema has been
published to the DB
- payloadPlus = inTransitSMQPublishedResults.get(segmentId);
+ // segment schema has been fetched via metadata query and the schema has
been published to the DB
+ payloadPlus = temporaryPublishedMetadataQueryResults.get(segmentId);
if (payloadPlus != null) {
return Optional.of(payloadPlus);
}
@@ -247,8 +247,8 @@ public class SegmentSchemaCache
public boolean isSchemaCached(SegmentId segmentId)
{
return realtimeSegmentSchema.containsKey(segmentId) ||
- inTransitSMQResults.containsKey(segmentId) ||
- inTransitSMQPublishedResults.containsKey(segmentId) ||
+ temporaryMetadataQueryResults.containsKey(segmentId) ||
+ temporaryPublishedMetadataQueryResults.containsKey(segmentId) ||
isFinalizedSegmentSchemaCached(segmentId);
}
@@ -278,8 +278,8 @@ public class SegmentSchemaCache
{
// remove the segment from all the maps
realtimeSegmentSchema.remove(segmentId);
- inTransitSMQResults.remove(segmentId);
- inTransitSMQPublishedResults.remove(segmentId);
+ temporaryMetadataQueryResults.remove(segmentId);
+ temporaryPublishedMetadataQueryResults.remove(segmentId);
// Since finalizedSegmentMetadata & finalizedSegmentSchema is updated on
each DB poll,
// there is no need to remove segment from them.
@@ -296,11 +296,31 @@ public class SegmentSchemaCache
public void emitStats()
{
-
emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/realtime/count",
realtimeSegmentSchema.size()));
-
emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/finalizedSegmentMetadata/count",
getSegmentMetadataMap().size()));
-
emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/finalizedSchemaPayload/count",
getSchemaPayloadMap().size()));
-
emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/inTransitSMQResults/count",
inTransitSMQResults.size()));
-
emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/inTransitSMQPublishedResults/count",
inTransitSMQPublishedResults.size()));
+ emitter.emit(ServiceMetricEvent.builder()
+ .setMetric(
+
"metadatacache/realtimeSegmentSchema/count",
+ realtimeSegmentSchema.size()
+ ));
+ emitter.emit(ServiceMetricEvent.builder()
+ .setMetric(
+
"metadatacache/finalizedSegmentMetadata/count",
+ getSegmentMetadataMap().size()
+ ));
+ emitter.emit(ServiceMetricEvent.builder()
+ .setMetric(
+
"metadatacache/finalizedSchemaPayload/count",
+ getSchemaPayloadMap().size()
+ ));
+ emitter.emit(ServiceMetricEvent.builder().setMetric(
+ "metadatacache/temporaryMetadataQueryResults/count",
+ temporaryMetadataQueryResults.size()
+ )
+ );
+ emitter.emit(ServiceMetricEvent.builder().setMetric(
+
"metadatacache/temporaryPublishedMetadataQueryResults/count",
+ temporaryPublishedMetadataQueryResults.size()
+ )
+ );
}
/**
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 6a67d4818ea..764d7239736 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -55,7 +55,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.BaseProgressIndicator;
-import org.apache.druid.segment.DataSegmentWithSchema;
+import org.apache.druid.segment.DataSegmentWithMetadata;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.QueryableIndex;
@@ -796,16 +796,16 @@ public class AppenderatorImpl implements Appenderator
continue;
}
- final DataSegmentWithSchema dataSegmentWithSchema = mergeAndPush(
+ final DataSegmentWithMetadata dataSegmentWithMetadata =
mergeAndPush(
entry.getKey(),
entry.getValue(),
useUniquePath
);
- if (dataSegmentWithSchema != null) {
- DataSegment segment = dataSegmentWithSchema.getDataSegment();
+ if (dataSegmentWithMetadata != null) {
+ DataSegment segment = dataSegmentWithMetadata.getDataSegment();
dataSegments.add(segment);
- SchemaPayloadPlus schemaPayloadPlus =
dataSegmentWithSchema.getSegmentSchemaMetadata();
+ SchemaPayloadPlus schemaPayloadPlus =
dataSegmentWithMetadata.getSegmentSchemaMetadata();
if (schemaPayloadPlus != null) {
SchemaPayload schemaPayload =
schemaPayloadPlus.getSchemaPayload();
segmentSchemaMapping.addSchema(
@@ -854,7 +854,7 @@ public class AppenderatorImpl implements Appenderator
* @return segment descriptor, or null if the sink is no longer valid
*/
@Nullable
- private DataSegmentWithSchema mergeAndPush(
+ private DataSegmentWithMetadata mergeAndPush(
final SegmentIdWithShardSpec identifier,
final Sink sink,
final boolean useUniquePath
@@ -898,7 +898,7 @@ public class AppenderatorImpl implements Appenderator
);
} else {
log.info("Segment[%s] already pushed, skipping.", identifier);
- return new DataSegmentWithSchema(
+ return new DataSegmentWithMetadata(
objectMapper.readValue(descriptorFile, DataSegment.class),
centralizedDatasourceSchemaConfig.isEnabled() ?
TaskSegmentSchemaUtil.getSegmentSchema(
mergedTarget,
@@ -1017,7 +1017,7 @@ public class AppenderatorImpl implements Appenderator
objectMapper.writeValueAsString(segment.getLoadSpec())
);
- return new DataSegmentWithSchema(
+ return new DataSegmentWithMetadata(
segment,
centralizedDatasourceSchemaConfig.isEnabled()
? TaskSegmentSchemaUtil.getSegmentSchema(mergedTarget, indexIO)
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
index 2f973f63bf5..128de15196d 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
@@ -50,7 +50,7 @@ import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.BaseProgressIndicator;
-import org.apache.druid.segment.DataSegmentWithSchema;
+import org.apache.druid.segment.DataSegmentWithMetadata;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.QueryableIndex;
@@ -715,16 +715,16 @@ public class BatchAppenderator implements Appenderator
}
// push it:
- final DataSegmentWithSchema dataSegmentWithSchema = mergeAndPush(
+ final DataSegmentWithMetadata dataSegmentWithMetadata =
mergeAndPush(
identifier,
sinkForIdentifier
);
// record it:
- if (dataSegmentWithSchema.getDataSegment() != null) {
- DataSegment segment = dataSegmentWithSchema.getDataSegment();
+ if (dataSegmentWithMetadata.getDataSegment() != null) {
+ DataSegment segment = dataSegmentWithMetadata.getDataSegment();
dataSegments.add(segment);
- SchemaPayloadPlus schemaPayloadPlus =
dataSegmentWithSchema.getSegmentSchemaMetadata();
+ SchemaPayloadPlus schemaPayloadPlus =
dataSegmentWithMetadata.getSegmentSchemaMetadata();
if (schemaPayloadPlus != null) {
SchemaPayload schemaPayload =
schemaPayloadPlus.getSchemaPayload();
segmentSchemaMapping.addSchema(
@@ -758,7 +758,7 @@ public class BatchAppenderator implements Appenderator
* @param sink sink to push
* @return segment descriptor along with schema, or null if the sink is no
longer valid
*/
- private DataSegmentWithSchema mergeAndPush(
+ private DataSegmentWithMetadata mergeAndPush(
final SegmentIdWithShardSpec identifier,
final Sink sink
)
@@ -793,7 +793,7 @@ public class BatchAppenderator implements Appenderator
if (descriptorFile.exists()) {
// Already pushed.
log.info("Segment[%s] already pushed, skipping.", identifier);
- return new DataSegmentWithSchema(
+ return new DataSegmentWithMetadata(
objectMapper.readValue(descriptorFile, DataSegment.class),
centralizedDatasourceSchemaConfig.isEnabled() ?
TaskSegmentSchemaUtil.getSegmentSchema(
mergedTarget,
@@ -895,7 +895,7 @@ public class BatchAppenderator implements Appenderator
objectMapper.writeValueAsString(segment.getLoadSpec())
);
- return new DataSegmentWithSchema(segment, schemaMetadata);
+ return new DataSegmentWithMetadata(segment, schemaMetadata);
}
catch (Exception e) {
metrics.incrementFailedHandoffs();
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 1c5dd42dd77..7622b6943af 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -57,7 +57,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.BaseProgressIndicator;
-import org.apache.druid.segment.DataSegmentWithSchema;
+import org.apache.druid.segment.DataSegmentWithMetadata;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.QueryableIndex;
@@ -809,15 +809,15 @@ public class StreamAppenderator implements Appenderator
continue;
}
- final DataSegmentWithSchema dataSegmentWithSchema = mergeAndPush(
+ final DataSegmentWithMetadata dataSegmentWithMetadata =
mergeAndPush(
entry.getKey(),
entry.getValue(),
useUniquePath
);
- if (dataSegmentWithSchema != null) {
- DataSegment segment = dataSegmentWithSchema.getDataSegment();
+ if (dataSegmentWithMetadata != null) {
+ DataSegment segment = dataSegmentWithMetadata.getDataSegment();
dataSegments.add(segment);
- SchemaPayloadPlus schemaPayloadPlus =
dataSegmentWithSchema.getSegmentSchemaMetadata();
+ SchemaPayloadPlus schemaPayloadPlus =
dataSegmentWithMetadata.getSegmentSchemaMetadata();
if (schemaPayloadPlus != null) {
SchemaPayload schemaPayload =
schemaPayloadPlus.getSchemaPayload();
segmentSchemaMapping.addSchema(
@@ -866,7 +866,7 @@ public class StreamAppenderator implements Appenderator
* @return segment descriptor, or null if the sink is no longer valid
*/
@Nullable
- private DataSegmentWithSchema mergeAndPush(
+ private DataSegmentWithMetadata mergeAndPush(
final SegmentIdWithShardSpec identifier,
final Sink sink,
final boolean useUniquePath
@@ -910,7 +910,7 @@ public class StreamAppenderator implements Appenderator
);
} else {
log.info("Segment[%s] already pushed, skipping.", identifier);
- return new DataSegmentWithSchema(
+ return new DataSegmentWithMetadata(
objectMapper.readValue(descriptorFile, DataSegment.class),
centralizedDatasourceSchemaConfig.isEnabled() ?
TaskSegmentSchemaUtil.getSegmentSchema(
mergedTarget,
@@ -988,7 +988,7 @@ public class StreamAppenderator implements Appenderator
objectMapper.writeValueAsString(segment.getLoadSpec())
);
- return new DataSegmentWithSchema(
+ return new DataSegmentWithMetadata(
segment,
centralizedDatasourceSchemaConfig.isEnabled()
? TaskSegmentSchemaUtil.getSegmentSchema(mergedTarget, indexIO)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 816cf44090b..0787bc8f7d4 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -71,7 +71,7 @@ import
org.apache.druid.server.coordinator.duty.KillDatasourceMetadata;
import org.apache.druid.server.coordinator.duty.KillRules;
import org.apache.druid.server.coordinator.duty.KillStalePendingSegments;
import org.apache.druid.server.coordinator.duty.KillSupervisors;
-import
org.apache.druid.server.coordinator.duty.KillUnreferencedSegmentSchemaDuty;
+import org.apache.druid.server.coordinator.duty.KillUnreferencedSegmentSchema;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.coordinator.duty.MarkEternityTombstonesAsUnused;
import
org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused;
@@ -603,7 +603,7 @@ public class DruidCoordinator
new KillCompactionConfig(killConfigs.compactionConfigs(),
metadataManager.segments(), metadataManager.configs())
);
if (centralizedDatasourceSchemaConfig.isEnabled()) {
- duties.add(new
KillUnreferencedSegmentSchemaDuty(killConfigs.segmentSchemas(),
metadataManager.schemas()));
+ duties.add(new
KillUnreferencedSegmentSchema(killConfigs.segmentSchemas(),
metadataManager.schemas()));
}
return duties;
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDuty.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchema.java
similarity index 91%
rename from
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDuty.java
rename to
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchema.java
index 82ad4cedc77..bfdfb84aaf4 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDuty.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchema.java
@@ -49,12 +49,12 @@ import java.util.List;
* </ol>
* </p>
*/
-public class KillUnreferencedSegmentSchemaDuty extends MetadataCleanupDuty
+public class KillUnreferencedSegmentSchema extends MetadataCleanupDuty
{
- private static final Logger log = new
Logger(KillUnreferencedSegmentSchemaDuty.class);
+ private static final Logger log = new
Logger(KillUnreferencedSegmentSchema.class);
private final SegmentSchemaManager segmentSchemaManager;
- public KillUnreferencedSegmentSchemaDuty(
+ public KillUnreferencedSegmentSchema(
MetadataCleanupConfig config,
SegmentSchemaManager segmentSchemaManager
)
@@ -70,9 +70,9 @@ public class KillUnreferencedSegmentSchemaDuty extends
MetadataCleanupDuty
int unused = segmentSchemaManager.markUnreferencedSchemasAsUnused();
log.info("Marked [%s] unreferenced schemas as unused.", unused);
- // 2 (repair step): Identify unused schema which are still referenced by
segments, make them used.
- // This case would arise when segment is associated with a schema which
turned unused by the previous statement
- // or the previous run of this duty.
+ // 2 (repair step): Find unused schema which are still referenced by
segments, make them used.
+ // This case would arise when segment is associated with a schema which
was marked unused in the previous step
+ // or in the previous run.
List<String> schemaFingerprintsToUpdate =
segmentSchemaManager.findReferencedSchemaMarkedAsUnused();
if (schemaFingerprintsToUpdate.size() > 0) {
segmentSchemaManager.markSchemaAsUsed(schemaFingerprintsToUpdate);
diff --git
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java
index 73fb07472f5..f166befde73 100644
---
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java
+++
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java
@@ -22,16 +22,12 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.joda.time.DateTime;
-
-import java.io.IOException;
public class SqlSegmentsMetadataManagerTestBase
{
@@ -83,31 +79,4 @@ public class SqlSegmentsMetadataManagerTestBase
"wikipedia/index/y=2012/m=01/d=05/2012-01-06T22:19:12.565Z/0/index.zip",
0
);
-
- protected void publish(DataSegment segment, boolean used) throws IOException
- {
- publish(segment, used, DateTimes.nowUtc());
- }
-
- protected void publish(DataSegment segment, boolean used, DateTime
usedFlagLastUpdated) throws IOException
- {
- boolean partitioned = !(segment.getShardSpec() instanceof NoneShardSpec);
-
- String usedFlagLastUpdatedStr = null;
- if (null != usedFlagLastUpdated) {
- usedFlagLastUpdatedStr = usedFlagLastUpdated.toString();
- }
- publisher.publishSegment(
- segment.getId().toString(),
- segment.getDataSource(),
- DateTimes.nowUtc().toString(),
- segment.getInterval().getStart().toString(),
- segment.getInterval().getEnd().toString(),
- partitioned,
- segment.getVersion(),
- used,
- jsonMapper.writeValueAsBytes(segment),
- usedFlagLastUpdatedStr
- );
- }
}
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index 9e099a58784..e5b6db1d42d 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -1590,7 +1590,7 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
schema.onLeaderStart();
schema.awaitInitialization();
- // verify SMQ is not executed, since the schema is already cached
+ // verify metadata query is not executed, since the schema is already
cached
Assert.assertEquals(0, refreshCount.get());
// verify that datasource schema is built
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java
index a1a49d91bbb..09358550802 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java
@@ -51,7 +51,7 @@ public class FingerprintGeneratorTest
SchemaPayload schemaPayload = new SchemaPayload(rowSignature,
aggregatorFactoryMap);
- String expected =
"FB7E8AD8F2B96E58ACB99F43E380106D134774B1F5C56641268539FBADB897B3";
+ String expected =
"DEE5E8F59833102F0FA5B10F8B8884EA15220D1D2A5F6097A93D8309132E1039";
Assert.assertEquals(expected,
fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 0));
}
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java
index 9b7ddde4b7e..234b16bd9b5 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java
@@ -63,20 +63,20 @@ public class SegmentSchemaCacheTest
RowSignature rowSignature = RowSignature.builder().add("cx",
ColumnType.FLOAT).build();
SchemaPayloadPlus expected = new SchemaPayloadPlus(new
SchemaPayload(rowSignature, Collections.emptyMap()), 20L);
SegmentId id = SegmentId.dummy("ds");
- cache.addInTransitSMQResult(id, rowSignature, Collections.emptyMap(), 20);
+ cache.addTemporaryMetadataQueryResult(id, rowSignature,
Collections.emptyMap(), 20);
Assert.assertTrue(cache.isSchemaCached(id));
Optional<SchemaPayloadPlus> schema = cache.getSchemaForSegment(id);
Assert.assertTrue(schema.isPresent());
Assert.assertEquals(expected, schema.get());
- cache.markInTransitSMQResultPublished(id);
+ cache.markInMetadataQueryResultPublished(id);
schema = cache.getSchemaForSegment(id);
Assert.assertTrue(schema.isPresent());
Assert.assertEquals(expected, schema.get());
- cache.resetInTransitSMQResultPublishedOnDBPoll();
+ cache.resetTemporaryPublishedMetadataQueryResultOnDBPoll();
Assert.assertFalse(cache.isSchemaCached(id));
schema = cache.getSchemaForSegment(id);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDutyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java
similarity index 95%
rename from
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDutyTest.java
rename to
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java
index a7701c4e104..819c2e9e37d 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDutyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java
@@ -62,7 +62,7 @@ import java.util.List;
import java.util.Set;
@RunWith(MockitoJUnitRunner.class)
-public class KillUnreferencedSegmentSchemaDutyTest
+public class KillUnreferencedSegmentSchemaTest
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule =
@@ -109,8 +109,8 @@ public class KillUnreferencedSegmentSchemaDutyTest
Period.parse("PT1H").toStandardDuration(),
Period.parse("PT6H").toStandardDuration()
);
- KillUnreferencedSegmentSchemaDuty duty =
- new TestKillUnreferencedSegmentSchemasDuty(cleanupConfig,
segmentSchemaManager, dateTimes);
+ KillUnreferencedSegmentSchema duty =
+ new TestKillUnreferencedSegmentSchemas(cleanupConfig,
segmentSchemaManager, dateTimes);
Set<DataSegment> segments = new HashSet<>();
List<SegmentSchemaManager.SegmentSchemaMetadataPlus> schemaMetadataPluses
= new ArrayList<>();
@@ -215,8 +215,8 @@ public class KillUnreferencedSegmentSchemaDutyTest
Period.parse("PT6H").toStandardDuration()
);
- KillUnreferencedSegmentSchemaDuty duty =
- new TestKillUnreferencedSegmentSchemasDuty(cleanupConfig,
segmentSchemaManager, dateTimes);
+ KillUnreferencedSegmentSchema duty =
+ new TestKillUnreferencedSegmentSchemas(cleanupConfig,
segmentSchemaManager, dateTimes);
RowSignature rowSignature = RowSignature.builder().add("c1",
ColumnType.FLOAT).build();
@@ -287,8 +287,8 @@ public class KillUnreferencedSegmentSchemaDutyTest
Period.parse("PT6H").toStandardDuration()
);
- KillUnreferencedSegmentSchemaDuty duty =
- new TestKillUnreferencedSegmentSchemasDuty(cleanupConfig,
segmentSchemaManager, dateTimes);
+ KillUnreferencedSegmentSchema duty =
+ new TestKillUnreferencedSegmentSchemas(cleanupConfig,
segmentSchemaManager, dateTimes);
// create 2 versions of same schema
// unreferenced one should get deleted
@@ -361,12 +361,12 @@ public class KillUnreferencedSegmentSchemaDutyTest
Assert.assertNull(getSchemaUsedStatus(fingerprintOldVersion));
}
- private static class TestKillUnreferencedSegmentSchemasDuty extends
KillUnreferencedSegmentSchemaDuty
+ private static class TestKillUnreferencedSegmentSchemas extends
KillUnreferencedSegmentSchema
{
private final List<DateTime> dateTimes;
private int index = -1;
- public TestKillUnreferencedSegmentSchemasDuty(
+ public TestKillUnreferencedSegmentSchemas(
MetadataCleanupConfig config,
SegmentSchemaManager segmentSchemaManager,
List<DateTime> dateTimes
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]