cryptoe commented on code in PR #15817:
URL: https://github.com/apache/druid/pull/15817#discussion_r1577708818
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1801,16 +1868,57 @@ public int deletePendingSegments(String dataSource)
);
}
+ private boolean shouldPersistSchema(SegmentSchemaMapping
segmentSchemaMapping)
+ {
+ return schemaPersistEnabled
+ && segmentSchemaMapping != null
+ && segmentSchemaMapping.isNonEmpty();
+ }
+
+ private void persistSchema(
+ final Handle handle,
+ final Set<DataSegment> segments,
+ final SegmentSchemaMapping segmentSchemaMapping
+ ) throws JsonProcessingException
+ {
+ if (segmentSchemaMapping.getSchemaVersion() !=
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) {
+ log.error(
+ "Schema version [%d] doesn't match the current version [%d]. Not
persisting this schema [%s]. "
+ + "Schema for this segment will be populated by the schema backfill
job in Coordinator.",
+ segmentSchemaMapping.getSchemaVersion(),
Review Comment:
This check should be outside the transaction. Lets create a follow up patch
for that.
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -540,25 +560,51 @@ tableName, getSerialType(), getPayloadType()
}
/**
- * Adds the used_status_last_updated column to the "segments" table.
+ * Adds new columns (used_status_last_updated) to the "segments" table.
+ * Conditionally, add schema_fingerprint, num_rows columns.
*/
- protected void alterSegmentTableAddUsedFlagLastUpdated()
+ protected void alterSegmentTable()
{
final String tableName = tablesConfigSupplier.get().getSegmentsTable();
- if (tableHasColumn(tableName, "used_status_last_updated")) {
- log.info("Table[%s] already has column[used_status_last_updated].",
tableName);
- } else {
- log.info("Adding column[used_status_last_updated] to table[%s].",
tableName);
- alterTable(
- tableName,
- ImmutableList.of(
- StringUtils.format(
- "ALTER TABLE %1$s ADD used_status_last_updated varchar(255)",
- tableName
- )
- )
- );
+
+ Map<String, String> columnNameTypes = new HashMap<>();
+ columnNameTypes.put("used_status_last_updated", "VARCHAR(255)");
+
+ if (centralizedDatasourceSchemaConfig.isEnabled()) {
+ columnNameTypes.put("schema_fingerprint", "VARCHAR(255)");
+ columnNameTypes.put("num_rows", "BIGINT");
+ }
+
+ Set<String> columnsToAdd = new HashSet<>();
+
+ for (String columnName : columnNameTypes.keySet()) {
Review Comment:
We should add a test case where we are checking this logic.
Note: Followup item.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]