kfaraz commented on code in PR #15817:
URL: https://github.com/apache/druid/pull/15817#discussion_r1578017863
##########
docs/configuration/index.md:
##########
@@ -1435,6 +1435,7 @@ MiddleManagers pass their configurations down to their
child peons. The MiddleMa
|`druid.worker.baseTaskDirs`|List of base temporary working directories, one
of which is assigned per task in a round-robin fashion. This property can be
used to allow usage of multiple disks for indexing. This property is
recommended in place of and takes precedence over
`${druid.indexer.task.baseTaskDir}`. If this configuration is not set,
`${druid.indexer.task.baseTaskDir}` is used. For example,
`druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by
tasks on any single task dir. This value is treated symmetrically across all
directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then
each of those task directories is assumed to allow for 500 GB to be used and a
total of 1.5 TB will potentially be available across all tasks. The actual
amount of memory assigned to each task is discussed in [Configuring task
storage
sizes](../ingestion/tasks.md#configuring-task-storage-sizes)|`Long.MAX_VALUE`|
|`druid.worker.category`|A string to name the category that the MiddleManager
node belongs to.|`_default_worker_category`|
+|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This
config should be set when CentralizedDatasourceSchema feature is enabled.
|false|
Review Comment:
For follow-up PR:
The config description should be more like `Indicates whether centralized
schema management is enabled`. The description should also link to the page
which contains the details of the feature.
##########
docs/operations/metrics.md:
##########
@@ -75,6 +75,12 @@ Most metric values reset each emission period, as specified
in `druid.monitoring
|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch
datasource schema.||
|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch
datasource schema.||
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch
datasource schema.||
+|`metadatacache/backfill/count`|Number of segments for which schema was back
filled in the database.|`dataSource`|
+|`schemacache/realtime/count`|Number of realtime segments for which schema is
cached.||Depends on the number of realtime segments.|
Review Comment:
Do these rows render correctly? The preceding rows have only 3 columns, this
one seems to have 4.
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -1056,12 +1136,19 @@ protected boolean tableHasColumn(String tableName,
String columnName)
*/
private void validateSegmentsTable()
{
- if (tableHasColumn(tablesConfigSupplier.get().getSegmentsTable(),
"used_status_last_updated")) {
+ String segmentsTables = tablesConfigSupplier.get().getSegmentsTable();
Review Comment:
**For follow-up PR:**
```suggestion
final String segmentsTables =
tablesConfigSupplier.get().getSegmentsTable();
```
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -927,6 +973,40 @@ public Void withHandle(Handle handle)
}
}
+ public void createSegmentSchemaTable(final String tableName)
+ {
+ createTable(
+ tableName,
+ ImmutableList.of(
+ StringUtils.format(
+ "CREATE TABLE %1$s (\n"
+ + " id %2$s NOT NULL,\n"
+ + " created_date VARCHAR(255) NOT NULL,\n"
+ + " datasource VARCHAR(255) NOT NULL,\n"
+ + " fingerprint VARCHAR(255) NOT NULL,\n"
+ + " payload %3$s NOT NULL,\n"
+ + " used BOOLEAN NOT NULL,\n"
+ + " used_status_last_updated VARCHAR(255) NOT NULL,\n"
+ + " version INTEGER NOT NULL,\n"
+ + " PRIMARY KEY (id),\n"
+ + " UNIQUE (fingerprint) \n"
+ + ")",
+ tableName, getSerialType(), getPayloadType()
+ ),
+ StringUtils.format("CREATE INDEX idx_%1$s_fingerprint ON
%1$s(fingerprint)", tableName),
+ StringUtils.format("CREATE INDEX idx_%1$s_used ON %1$s(used)",
tableName)
Review Comment:
**For follow-up PR:**
While finding schemas to delete, I think we would also use the column
`used_status_last_updated`. If yes, then that should also be a part of the
index.
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.SegmentMetadata;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * In-memory cache of segment schema.
+ * <p>
+ * Internally, mapping of segmentId to segment level information like schemaId
& numRows is maintained.
+ * This mapping is updated on each database poll {@link
SegmentSchemaCache#finalizedSegmentSchemaInfo}.
+ * Segment schema created since last DB poll is also fetched and updated in
the cache {@code finalizedSegmentSchema}.
+ * <p>
+ * Additionally, this class caches schema for realtime segments in {@link
SegmentSchemaCache#realtimeSegmentSchema}. This mapping
+ * is cleared either when the segment is removed or marked as finalized.
+ * <p>
+ * Finalized segments which do not have their schema information present in
the DB, fetch their schema via SMQ.
+ * SMQ results are cached in {@link SegmentSchemaCache#inTransitSMQResults}.
Once the schema information is backfilled
+ * in the DB, it is removed from {@link
SegmentSchemaCache#inTransitSMQResults} and added to {@link
SegmentSchemaCache#inTransitSMQPublishedResults}.
+ * {@link SegmentSchemaCache#inTransitSMQPublishedResults} is cleared on each
successfull DB poll.
+ * <p>
+ * {@link CoordinatorSegmentMetadataCache} uses this cache to fetch schema for
a segment.
+ * <p>
+ * Schema corresponding to the specified version in {@link
CentralizedDatasourceSchemaConfig#SCHEMA_VERSION} is cached.
+ */
+@LazySingleton
+public class SegmentSchemaCache
+{
+ private static final Logger log = new Logger(SegmentSchemaCache.class);
+
+ /**
+ * Cache is marked initialized after first DB poll.
+ */
+ private final AtomicReference<CountDownLatch> initialized = new
AtomicReference<>(new CountDownLatch(1));
+
+ /**
+ * Finalized segment schema information.
+ */
+ private volatile FinalizedSegmentSchemaInfo finalizedSegmentSchemaInfo =
+ new FinalizedSegmentSchemaInfo(ImmutableMap.of(), ImmutableMap.of());
+
+ /**
+ * Schema information for realtime segment. This mapping is updated when
schema for realtime segment is received.
+ * The mapping is removed when the segment is either removed or marked as
finalized.
+ */
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
realtimeSegmentSchema = new ConcurrentHashMap<>();
+
+ /**
+ * If the segment schema is fetched via SMQ, subsequently it is added here.
+ * The mapping is removed when the schema information is backfilled in the
DB.
+ */
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
inTransitSMQResults = new ConcurrentHashMap<>();
+
+ /**
+ * Once the schema information is backfilled in the DB, it is added here.
+ * This map is cleared after each DB poll.
+ * After the DB poll and before clearing this map it is possible that some
results were added to this map.
+ * These results would get lost after clearing this map.
+ * But, it should be fine since the schema could be retrieved if needed
using SMQ, also the schema would be available in the next poll.
+ */
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
inTransitSMQPublishedResults = new ConcurrentHashMap<>();
+
+ private final ServiceEmitter emitter;
+
+ @Inject
+ public SegmentSchemaCache(ServiceEmitter emitter)
+ {
+ this.emitter = emitter;
+ }
+
+ public void setInitialized()
+ {
+ if (!isInitialized()) {
+ initialized.get().countDown();
+ log.info("SegmentSchemaCache is initialized.");
+ }
+ }
+
+ /**
+ * This method is called when the current node is no longer the leader.
+ * The schema is cleared except for {@code realtimeSegmentSchemaMap}.
+ * Realtime schema continues to be updated on both the leader and follower
nodes.
+ */
+ public void onLeaderStop()
+ {
+ initialized.set(new CountDownLatch(1));
+
+ finalizedSegmentSchemaInfo = new
FinalizedSegmentSchemaInfo(ImmutableMap.of(), ImmutableMap.of());
+ inTransitSMQResults.clear();
+ inTransitSMQPublishedResults.clear();
+ }
+
+ public boolean isInitialized()
+ {
+ return initialized.get().getCount() == 0;
+ }
+
+ /**
+ * {@link CoordinatorSegmentMetadataCache} startup waits on the cache
initialization.
+ * This is being done to ensure that we don't execute SMQ for segment with
schema already present in the DB.
+ */
+ public void awaitInitialization() throws InterruptedException
+ {
+ initialized.get().await();
+ }
+
+ /**
+ * This method is called after each DB Poll. It updates reference for
segment metadata and schema maps.
+ */
+ public void updateFinalizedSegmentSchema(FinalizedSegmentSchemaInfo
finalizedSegmentSchemaInfo)
+ {
+ this.finalizedSegmentSchemaInfo = finalizedSegmentSchemaInfo;
+ setInitialized();
+ }
+
+ /**
+ * Cache schema for realtime segment. This is cleared when segment is
published.
+ */
+ public void addRealtimeSegmentSchema(SegmentId segmentId, RowSignature
rowSignature, long numRows)
+ {
+ realtimeSegmentSchema.put(segmentId, new SchemaPayloadPlus(new
SchemaPayload(rowSignature), numRows));
+ }
+
+ /**
+ * Cache SMQ result. This entry is cleared when SMQ result is published to
the DB.
+ */
+ public void addInTransitSMQResult(
Review Comment:
**For follow-up PR:**
https://github.com/apache/druid/pull/16044
There seems to be no place in the code or documentation which seems to
explain this abbreviation `SMQ`.
Better to rename this methods to the likes of
`addTemporaryMetadataQueryResult`.
Also, `inTransit` is a little confusing as it implies that the result is
being sent somewhere.
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java:
##########
@@ -533,13 +534,13 @@ public TaskStatus getStatus(String taskId)
}
}
- public Set<DataSegment> getPublishedSegments(String taskId)
+ public DataSegmentsWithSchemas getPublishedSegments(String taskId)
Review Comment:
**For follow-up PR**
```suggestion
public DataSegmentsWithSchemas getPublishedSegmentsWithSchemas(String
taskId)
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java:
##########
@@ -58,7 +61,7 @@ public class InputSourceProcessor
*
* @return {@link SegmentsAndCommitMetadata} for the pushed segments.
*/
- public static SegmentsAndCommitMetadata process(
+ public static Pair<SegmentsAndCommitMetadata, SegmentSchemaMapping> process(
Review Comment:
**For follow-up PR**
We should include `SegmentSchemaMapping` inside the
`SegmentsAndCommitMetadata` itself.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -905,7 +907,7 @@ private TaskStatus generateAndPublishSegments(
try (final BatchAppenderatorDriver driver =
BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
driver.startJob();
- SegmentsAndCommitMetadata pushed = InputSourceProcessor.process(
+ Pair<SegmentsAndCommitMetadata, SegmentSchemaMapping>
commitMetadataAndSchema = InputSourceProcessor.process(
Review Comment:
**For follow-up PR**
Why is `SegmentSchemaMapping` not included inside the
`SegmentsAndCommitMetadata` object itself?
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -906,6 +996,40 @@ public Void withHandle(Handle handle)
}
}
+ public void createSegmentSchemaTable(final String tableName)
+ {
+ createTable(
+ tableName,
+ ImmutableList.of(
+ StringUtils.format(
+ "CREATE TABLE %1$s (\n"
+ + " id %2$s NOT NULL,\n"
+ + " created_date VARCHAR(255) NOT NULL,\n"
Review Comment:
The `start`, `end`, `created_date` of `segments` table are also varchar.
Seems best to stick to it for this table too.
##########
integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java:
##########
@@ -163,4 +163,8 @@ public class TestNGGroup
public static final String HTTP_ENDPOINT = "http-endpoint";
public static final String CENTRALIZED_DATASOURCE_SCHEMA =
"centralized-datasource-schema";
+
+ public static final String CDS_TASK_SCHEMA_PUBLISH_DISABLED =
"cds-task-schema-publish-disabled";
+
+ public static final String CDS_COORDINATOR_SMQ_DISABLED =
"cds-coordinator-smq-disabled";
Review Comment:
We might as well just call it `metadata-query`. `SMQ` is not a known or
documented Druid abbreviation and is bound to lead to confusion.
##########
docs/operations/metrics.md:
##########
@@ -75,6 +75,12 @@ Most metric values reset each emission period, as specified
in `druid.monitoring
|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch
datasource schema.||
|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch
datasource schema.||
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch
datasource schema.||
+|`metadatacache/backfill/count`|Number of segments for which schema was back
filled in the database.|`dataSource`|
+|`schemacache/realtime/count`|Number of realtime segments for which schema is
cached.||Depends on the number of realtime segments.|
+|`schemacache/finalizedSegmentMetadata/count`|Number of finalized segments for
which schema metadata is cached.||Depends on the number of segments in the
cluster.|
+|`schemacache/finalizedSchemaPayload/count`|Number of finalized segment schema
cached.||Depends on the number of distinct schema in the cluster.|
+|`schemacache/inTransitSMQResults/count`|Number of segments for which schema
was fetched by executing segment metadata query.||Eventually it should be 0.|
+|`schemacache/inTransitSMQPublishedResults/count`|Number of segments for which
schema is cached after back filling in the database.||Eventually it should be
0.|
Review Comment:
**For follow-up PR:**
Is `schemacache/` not the same as `metadatacache`? The similar yet different
names can be confusing.
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Functions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.timeline.SegmentId;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.TransactionCallback;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Handles segment schema persistence and cleanup.
+ */
+@LazySingleton
+public class SegmentSchemaManager
+{
+ private static final EmittingLogger log = new
EmittingLogger(SegmentSchemaManager.class);
+ private static final int DB_ACTION_PARTITION_SIZE = 100;
+ private final MetadataStorageTablesConfig dbTables;
+ private final ObjectMapper jsonMapper;
+ private final SQLMetadataConnector connector;
+
+ @Inject
+ public SegmentSchemaManager(
+ MetadataStorageTablesConfig dbTables,
+ ObjectMapper jsonMapper,
+ SQLMetadataConnector connector
+ )
+ {
+ this.dbTables = dbTables;
+ this.jsonMapper = jsonMapper;
+ this.connector = connector;
+ }
+
+ /**
+ * Return a list of schema fingerprints
+ */
+ public List<String> findReferencedSchemaMarkedAsUnused()
+ {
+ return connector.retryWithHandle(
+ handle ->
+ handle.createQuery(
+ StringUtils.format(
+ "SELECT DISTINCT(schema_fingerprint) FROM %s WHERE
used = true AND schema_fingerprint IN (SELECT fingerprint FROM %s WHERE used =
false)",
+ dbTables.getSegmentsTable(),
+ dbTables.getSegmentSchemasTable()
+ ))
+ .mapTo(String.class)
+ .list()
+ );
+ }
+
+ public int markSchemaAsUsed(List<String> schemaFingerprints)
+ {
+ if (schemaFingerprints.isEmpty()) {
+ return 0;
+ }
+ String inClause = getInClause(schemaFingerprints.stream());
+
+ return connector.retryWithHandle(
+ handle ->
+ handle.createStatement(
+ StringUtils.format(
+ "UPDATE %s SET used = true, used_status_last_updated
= :now"
+ + " WHERE fingerprint IN (%s)",
+ dbTables.getSegmentSchemasTable(), inClause
+ )
+ )
+ .bind("now", DateTimes.nowUtc().toString())
+ .execute()
+ );
+ }
+
+ public int deleteSchemasOlderThan(long timestamp)
+ {
+ return connector.retryWithHandle(
+ handle -> handle.createStatement(
+ StringUtils.format(
+ "DELETE FROM %s WHERE used = false AND
used_status_last_updated < :now",
+ dbTables.getSegmentSchemasTable()
+ ))
+ .bind("now", DateTimes.utc(timestamp).toString())
+ .execute());
+ }
+
+ public int markUnreferencedSchemasAsUnused()
+ {
+ return connector.retryWithHandle(
+ handle ->
+ handle.createStatement(
+ StringUtils.format(
+ "UPDATE %s SET used = false,
used_status_last_updated = :now WHERE used != false "
+ + "AND fingerprint NOT IN (SELECT
DISTINCT(schema_fingerprint) FROM %s WHERE used=true AND schema_fingerprint IS
NOT NULL)",
+ dbTables.getSegmentSchemasTable(),
+ dbTables.getSegmentsTable()
+ )
+ )
+ .bind("now", DateTimes.nowUtc().toString())
+ .execute());
+ }
+
+ /**
+ * Persist segment schema and update segments in a transaction.
+ */
+ public void persistSchemaAndUpdateSegmentsTable(
+ final String dataSource,
+ final List<SegmentSchemaMetadataPlus> segmentSchemas,
+ final int version
+ )
+ {
+ connector.retryTransaction((TransactionCallback<Void>) (handle, status) ->
{
+ Map<String, SchemaPayload> schemaPayloadMap = new HashMap<>();
+
+ for (SegmentSchemaMetadataPlus segmentSchema : segmentSchemas) {
+ schemaPayloadMap.put(
+ segmentSchema.getFingerprint(),
+ segmentSchema.getSegmentSchemaMetadata().getSchemaPayload()
+ );
+ }
+ persistSegmentSchema(handle, dataSource, version, schemaPayloadMap);
+ updateSegmentWithSchemaInformation(handle, segmentSchemas);
+
+ return null;
+ }, 1, 3);
+ }
+
+ /**
+ * Persist unique segment schema in the DB.
+ */
+ public void persistSegmentSchema(
+ final Handle handle,
+ final String dataSource,
+ final int version,
+ final Map<String, SchemaPayload> fingerprintSchemaPayloadMap
+ ) throws JsonProcessingException
+ {
+ // Filter already existing schema
+ Map<Boolean, Set<String>> existingFingerprintsAndUsedStatus =
fingerprintExistBatch(
+ handle,
+ fingerprintSchemaPayloadMap.keySet()
+ );
+
+ // Used schema can also be marked as unused by the schema cleanup duty in
parallel.
+ // Refer to the javadocs in
org.apache.druid.server.coordinator.duty.KillUnreferencedSegmentSchemaDuty for
more details.
+ Set<String> usedExistingFingerprints =
existingFingerprintsAndUsedStatus.containsKey(true)
+ ?
existingFingerprintsAndUsedStatus.get(true)
+ : new HashSet<>();
+ Set<String> unusedExistingFingerprints =
existingFingerprintsAndUsedStatus.containsKey(false)
+ ?
existingFingerprintsAndUsedStatus.get(false)
+ : new HashSet<>();
+ Set<String> existingFingerprints = Sets.union(usedExistingFingerprints,
unusedExistingFingerprints);
+ if (existingFingerprints.size() > 0) {
+ log.info(
+ "Found already existing schema in the DB for dataSource [%1$s]. "
+ + "Used fingeprints: [%2$s], Unused fingerprints: [%3$s].",
+ dataSource,
+ usedExistingFingerprints,
+ unusedExistingFingerprints
+ );
+ }
+
+ // Unused schema can be deleted by the schema cleanup duty in parallel.
+ // Refer to the javadocs in
org.apache.druid.server.coordinator.duty.KillUnreferencedSegmentSchemaDuty for
more details.
+ if (unusedExistingFingerprints.size() > 0) {
+ // make the unused schema as used to prevent deletion
+ String inClause = getInClause(unusedExistingFingerprints.stream());
+
+ handle.createStatement(
+ StringUtils.format(
+ "UPDATE %s SET used = true, used_status_last_updated =
:now"
+ + " WHERE fingerprint IN (%s)",
+ dbTables.getSegmentSchemasTable(), inClause
+ )
+ )
+ .bind("now", DateTimes.nowUtc().toString())
+ .execute();
+ }
+
+ Map<String, SchemaPayload> schemaPayloadToCreate = new HashMap<>();
+
+ for (Map.Entry<String, SchemaPayload> entry :
fingerprintSchemaPayloadMap.entrySet()) {
+ if (!existingFingerprints.contains(entry.getKey())) {
+ schemaPayloadToCreate.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ if (schemaPayloadToCreate.isEmpty()) {
+ log.info("No schema to persist for dataSource [%s] and version [%s].",
dataSource, version);
+ return;
+ }
+
+ final List<List<String>> partitionedFingerprints = Lists.partition(
+ new ArrayList<>(schemaPayloadToCreate.keySet()),
+ DB_ACTION_PARTITION_SIZE
+ );
+
+ String insertSql = StringUtils.format(
+ "INSERT INTO %s (created_date, datasource, fingerprint, payload, used,
used_status_last_updated, version) "
+ + "VALUES (:created_date, :datasource, :fingerprint, :payload, :used,
:used_status_last_updated, :version)",
+ dbTables.getSegmentSchemasTable()
+ );
+
+ // insert schemas
+ PreparedBatch schemaInsertBatch = handle.prepareBatch(insertSql);
+ for (List<String> partition : partitionedFingerprints) {
+ for (String fingerprint : partition) {
+ final String now = DateTimes.nowUtc().toString();
+ schemaInsertBatch.add()
+ .bind("created_date", now)
+ .bind("datasource", dataSource)
+ .bind("fingerprint", fingerprint)
+ .bind("payload",
jsonMapper.writeValueAsBytes(fingerprintSchemaPayloadMap.get(fingerprint)))
+ .bind("used", true)
+ .bind("used_status_last_updated", now)
+ .bind("version", version);
+ }
+ final int[] affectedRows = schemaInsertBatch.execute();
+ final List<String> failedInserts = new ArrayList<>();
+ for (int i = 0; i < partition.size(); ++i) {
+ if (affectedRows[i] != 1) {
+ failedInserts.add(partition.get(i));
+ }
+ }
+ if (failedInserts.isEmpty()) {
+ log.info(
+ "Published schemas [%s] to DB for datasource [%s] and version
[%s]",
+ partition,
+ dataSource,
+ version
+ );
+ } else {
+ throw new ISE(
+ "Failed to publish schemas [%s] to DB for datasource [%s] and
version [%s]",
+ failedInserts,
+ dataSource,
+ version
+ );
+ }
+ }
+ }
+
+ /**
+ * Update segment with schemaId and numRows information.
+ */
+ public void updateSegmentWithSchemaInformation(
+ final Handle handle,
+ final List<SegmentSchemaMetadataPlus> batch
+ )
+ {
+ log.debug("Updating segment with schema and numRows information: [%s].",
batch);
+
+ // update schemaId and numRows in segments table
Review Comment:
```suggestion
// update fingerprint and numRows in segments table
```
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -540,25 +560,51 @@ tableName, getSerialType(), getPayloadType()
}
/**
- * Adds the used_status_last_updated column to the "segments" table.
+ * Adds new columns (used_status_last_updated) to the "segments" table.
+ * Conditionally, add schema_fingerprint, num_rows columns.
*/
- protected void alterSegmentTableAddUsedFlagLastUpdated()
+ protected void alterSegmentTable()
{
final String tableName = tablesConfigSupplier.get().getSegmentsTable();
- if (tableHasColumn(tableName, "used_status_last_updated")) {
- log.info("Table[%s] already has column[used_status_last_updated].",
tableName);
- } else {
- log.info("Adding column[used_status_last_updated] to table[%s].",
tableName);
- alterTable(
- tableName,
- ImmutableList.of(
- StringUtils.format(
- "ALTER TABLE %1$s ADD used_status_last_updated varchar(255)",
- tableName
- )
- )
- );
+
+ Map<String, String> columnNameTypes = new HashMap<>();
+ columnNameTypes.put("used_status_last_updated", "VARCHAR(255)");
+
+ if (centralizedDatasourceSchemaConfig.isEnabled()) {
+ columnNameTypes.put("schema_fingerprint", "VARCHAR(255)");
+ columnNameTypes.put("num_rows", "BIGINT");
+ }
+
+ Set<String> columnsToAdd = new HashSet<>();
Review Comment:
**For follow-up PR:**
Why build a `columnsToAdd` list and then an `alterCommands` list? Why not
build the `alterCommands` directly?
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -312,23 +318,37 @@ tableName, getPayloadType(), getCollation()
public void createSegmentTable(final String tableName)
{
+ List<String> columns = new ArrayList<>();
+ columns.add("id VARCHAR(255) NOT NULL");
+ columns.add("dataSource VARCHAR(255) %4$s NOT NULL");
+ columns.add("created_date VARCHAR(255) NOT NULL");
+ columns.add("start VARCHAR(255) NOT NULL");
+ columns.add("%3$send%3$s VARCHAR(255) NOT NULL");
+ columns.add("partitioned BOOLEAN NOT NULL");
+ columns.add("version VARCHAR(255) NOT NULL");
+ columns.add("used BOOLEAN NOT NULL");
+ columns.add("payload %2$s NOT NULL");
+ columns.add("used_status_last_updated VARCHAR(255) NOT NULL");
+
+ if (centralizedDatasourceSchemaConfig.isEnabled()) {
+ columns.add("schema_fingerprint VARCHAR(255)");
+ columns.add("num_rows BIGINT");
+ }
+
+ StringBuilder createStatementBuilder = new StringBuilder("CREATE TABLE
%1$s (");
+
+ for (String column : columns) {
+ createStatementBuilder.append(column);
+ createStatementBuilder.append(",");
Review Comment:
**For follow-up PR:**
Nit: We seem to have removed the new line characters. They formatted the
statement nicely in case we wanted to debug it.
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -1056,12 +1136,19 @@ protected boolean tableHasColumn(String tableName,
String columnName)
*/
private void validateSegmentsTable()
{
- if (tableHasColumn(tablesConfigSupplier.get().getSegmentsTable(),
"used_status_last_updated")) {
+ String segmentsTables = tablesConfigSupplier.get().getSegmentsTable();
+
+ boolean schemaPersistenceRequirementMet =
Review Comment:
**For follow-up PR:**
Please rename this to something more suggestive of what it checks.
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java:
##########
@@ -1016,10 +1018,13 @@ private TaskToolboxFactory createToolboxFactory(
TaskActionClientFactory taskActionClientFactory
)
{
+ CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig = new
CentralizedDatasourceSchemaConfig();
+ centralizedDatasourceSchemaConfig.setEnabled(true);
TestTaskToolboxFactory.Builder builder = new
TestTaskToolboxFactory.Builder()
.setConfig(taskConfig)
.setIndexIO(new IndexIO(getObjectMapper(), ColumnConfig.DEFAULT))
- .setTaskActionClientFactory(taskActionClientFactory);
+ .setTaskActionClientFactory(taskActionClientFactory)
+ .setCentralizedTableSchemaConfig(centralizedDatasourceSchemaConfig);
Review Comment:
**For follow-up PR**
Is this needed for these tests?
##########
extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java:
##########
@@ -64,10 +65,11 @@ public PostgreSQLConnector(
Supplier<MetadataStorageConnectorConfig> config,
Supplier<MetadataStorageTablesConfig> dbTables,
PostgreSQLConnectorConfig connectorConfig,
- PostgreSQLTablesConfig tablesConfig
+ PostgreSQLTablesConfig tablesConfig,
+ CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
Review Comment:
**For follow-up PR**
Is this config bound to a non-null value even when the feature is disabled?
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -540,25 +560,51 @@ tableName, getSerialType(), getPayloadType()
}
/**
- * Adds the used_status_last_updated column to the "segments" table.
+ * Adds new columns (used_status_last_updated) to the "segments" table.
+ * Conditionally, add schema_fingerprint, num_rows columns.
*/
- protected void alterSegmentTableAddUsedFlagLastUpdated()
+ protected void alterSegmentTable()
{
final String tableName = tablesConfigSupplier.get().getSegmentsTable();
- if (tableHasColumn(tableName, "used_status_last_updated")) {
- log.info("Table[%s] already has column[used_status_last_updated].",
tableName);
- } else {
- log.info("Adding column[used_status_last_updated] to table[%s].",
tableName);
- alterTable(
- tableName,
- ImmutableList.of(
- StringUtils.format(
- "ALTER TABLE %1$s ADD used_status_last_updated varchar(255)",
- tableName
- )
- )
- );
+
+ Map<String, String> columnNameTypes = new HashMap<>();
+ columnNameTypes.put("used_status_last_updated", "VARCHAR(255)");
+
+ if (centralizedDatasourceSchemaConfig.isEnabled()) {
+ columnNameTypes.put("schema_fingerprint", "VARCHAR(255)");
+ columnNameTypes.put("num_rows", "BIGINT");
+ }
+
+ Set<String> columnsToAdd = new HashSet<>();
+
+ for (String columnName : columnNameTypes.keySet()) {
+ if (tableHasColumn(tableName, columnName)) {
+ log.info("Table[%s] already has column[%s].", tableName, columnName);
+ } else {
+ columnsToAdd.add(columnName);
+ }
+ }
+
+ List<String> alterCommands = new ArrayList<>();
+ if (!columnsToAdd.isEmpty()) {
+ for (String columnName : columnsToAdd) {
+ alterCommands.add(
+ StringUtils.format(
+ "ALTER TABLE %1$s ADD %2$s %3$s",
+ tableName,
+ columnName,
+ columnNameTypes.get(columnName)
+ )
+ );
+ }
+
+ log.info("Adding columns %s to table[%s].", columnsToAdd, tableName);
}
+
+ alterTable(
+ tableName,
+ alterCommands
+ );
Review Comment:
**For follow-up PR:**
Nit: Please don't break lines unless needed. Method arguments need not
always be on separate lines. In fact, Druid style prefers them to be on the
same line unless they cross the limit of 80 chars (optionally) or 120 chars
(definitely break then).
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java:
##########
@@ -664,7 +665,7 @@ public ListenableFuture<TaskStatusResponse>
taskStatus(String taskId)
}
}
- public Set<DataSegment> getPublishedSegments(Task task)
+ public DataSegmentsWithSchemas getSegmentAndSchemas(Task task)
Review Comment:
**For follow-up PR**
```suggestion
public DataSegmentsWithSchemas getPublishedSegmentsWithSchemas(Task task)
```
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java:
##########
@@ -509,6 +548,21 @@ public Map<String, Long> getBlacklistedTaskSlotCount()
}
}
+ public void verifySchema(DataSegmentsWithSchemas dataSegmentsWithSchemas)
Review Comment:
**For follow-up PR**
What does it verify? Maybe add a javadoc.
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDuty.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.metadata.SegmentSchemaManager;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
+
+import java.util.List;
+
+/**
+ * Coordinator duty to clean up segment schema which are not referenced by any
used segment.
+ * <p>
+ * <ol>
+ * <li>If a schema is not referenced, UPDATE schemas SET used = false,
used_status_last_updated = now</li>
+ * <li>DELETE FROM schemas WHERE used = false AND used_status_last_updated < 6
hours ago</li>
+ * <li>When creating a new segment, try to find schema for the fingerprint of
the segment.</li>
+ * <ol type="a">
+ * <li> If no record found, create a new one.</li>
+ * <li> If record found which has used = true, reuse this schema_id.</li>
+ * <li> If record found which has used = false, UPDATE SET used = true,
used_status_last_updated = now</li>
+ * </ol>
+ * </ol>
+ * </p>
+ * <p>
+ * Possible race conditions:
+ * <ol type="a">
+ * <li> Between ops 1 and 3b: In other words, we might end up with a
segment that points to a schema that has just been marked as unused. This is
repaired by the coordinator duty.</li>
+ * <li> Between 2 and 3c: This can be handled. Either 2 will fail to update
any rows (good case) or 3c will fail to update any rows (bad case). In the bad
case, we need to recreate the schema, same as step 3a. </li>
+ * </ol>
+ * </p>
+ */
+public class KillUnreferencedSegmentSchemaDuty extends MetadataCleanupDuty
Review Comment:
**For follow-up PR**
Please rename to `KillUnreferencedSegmentSchemas` for uniformity with the
other coordinator duties.
##########
server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import com.google.common.primitives.Ints;
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.SchemaPayload;
+
+import java.io.IOException;
+
+/**
+ * Utility to generate fingerprint for an object.
+ */
+@LazySingleton
+public class FingerprintGenerator
+{
+ private static final Logger log = new Logger(FingerprintGenerator.class);
+
+ private final ObjectMapper objectMapper;
+
+ @Inject
+ public FingerprintGenerator(ObjectMapper objectMapper)
+ {
+ this.objectMapper = objectMapper;
+ }
+
+ /**
+ * Generates fingerprint or hash string for an object using SHA-256 hash
algorithm.
+ */
+ @SuppressWarnings("UnstableApiUsage")
+ public String generateFingerprint(SchemaPayload schemaPayload, String
dataSource, int version)
+ {
+ try {
+ final Hasher hasher = Hashing.sha256().newHasher();
+
+ hasher.putBytes(objectMapper.writeValueAsBytes(schemaPayload));
+ hasher.putBytes(StringUtils.toUtf8(dataSource));
Review Comment:
**For follow-up PR**
Maybe include a delimiter in between the same way as is being done in
`PendingSegmentRecord`.
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java:
##########
@@ -76,7 +76,7 @@ public void publishSegment(final DataSegment segment) throws
IOException
);
}
- private void publishSegment(
+ void publishSegment(
Review Comment:
**For follow-up PR:**
I went through a bit of trouble (#16044) to make this private. 😅
Let's not make it visible if it's only for testing. Find an alternate
approach for verifying the changes.
Please try to avoid exposing private methods as it really breaks OOP.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java:
##########
@@ -417,12 +419,16 @@ public SegmentPublishResult publishAnnotatedSegments(
);
final DataSourceMetadata endMetadata =
runner.createDataSourceMetadata(finalPartitions);
action = taskLockType == TaskLockType.APPEND
- ?
SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush,
startMetadata, endMetadata)
- :
SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata,
endMetadata);
+ ?
SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush,
startMetadata, endMetadata,
+
segmentSchemaMapping
+ )
+ :
SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata,
endMetadata,
+
segmentSchemaMapping
+ );
Review Comment:
**For follow-up PR**
Please fix the formatting here.
##########
processing/src/main/java/org/apache/druid/segment/DataSegmentWithSchema.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+
+/**
+ * Immutable wrapper class for segment and schema.
+ */
+public class DataSegmentWithSchema
Review Comment:
**For follow-up PR**
Rename to `DataSegmentWithMetadata` as it doesn't just have schema?
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java:
##########
@@ -65,7 +65,7 @@ public static void setup() throws IOException
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"),
UNUSED_V1));
actionTestKit.getMetadataStorageCoordinator()
- .commitSegments(expectedUnusedSegments);
+ .commitSegments(expectedUnusedSegments, null);
Review Comment:
**For follow-up PR**
Since passing `null` is a very common usage right now, it would be better to
keep two variants of the new methods. It would be easier to identify the
usages which pass non-null values and we could also avoid passing nulls all
over the place.
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java:
##########
@@ -110,4 +115,14 @@ public TaskStatus runTask(TaskToolbox toolbox)
{
return status;
}
+
+ public TaskAction<SegmentPublishResult> testBuildPublishAction(
Review Comment:
**For follow-up PR**
```suggestion
public TaskAction<SegmentPublishResult> buildPublishAction(
```
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java:
##########
@@ -192,7 +193,8 @@ public Pair<Integer, ReadableInput> apply(ReadableInput
readableInput)
frameContext.indexMerger(),
meters,
parseExceptionHandler,
- true
+ true,
+ CentralizedDatasourceSchemaConfig.create(false)
Review Comment:
Better to have this comment in the javadoc of
`CentralizedDatasourceSchemaConfig` itself.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java:
##########
@@ -433,16 +434,19 @@ private boolean
tryLockWithDetermineResult(TaskActionClient client, LockGranular
protected TaskAction<SegmentPublishResult> buildPublishAction(
Set<DataSegment> segmentsToBeOverwritten,
Set<DataSegment> segmentsToPublish,
+ SegmentSchemaMapping segmentSchemaMapping,
TaskLockType lockType
)
{
switch (lockType) {
case REPLACE:
- return SegmentTransactionalReplaceAction.create(segmentsToPublish);
+ return SegmentTransactionalReplaceAction.create(segmentsToPublish,
segmentSchemaMapping);
case APPEND:
- return SegmentTransactionalAppendAction.forSegments(segmentsToPublish);
+ return SegmentTransactionalAppendAction.forSegments(segmentsToPublish,
segmentSchemaMapping);
default:
- return
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten,
segmentsToPublish);
+ return
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten,
segmentsToPublish,
Review Comment:
**For follow-up PR**
Please fix formatting here.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java:
##########
@@ -68,22 +69,28 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
private final DataSourceMetadata endMetadata;
@Nullable
private final String dataSource;
+ @Nullable
+ private final SegmentSchemaMapping segmentSchemaMapping;
public static SegmentTransactionalInsertAction overwriteAction(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
- Set<DataSegment> segmentsToPublish
+ Set<DataSegment> segmentsToPublish,
+ @Nullable SegmentSchemaMapping segmentSchemaMapping
)
{
- return new SegmentTransactionalInsertAction(segmentsToBeOverwritten,
segmentsToPublish, null, null, null);
+ return new SegmentTransactionalInsertAction(segmentsToBeOverwritten,
segmentsToPublish, null, null, null,
Review Comment:
**For follow-up PR**
Please fix formatting here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]