kfaraz commented on code in PR #15817:
URL: https://github.com/apache/druid/pull/15817#discussion_r1578017863


##########
docs/configuration/index.md:
##########
@@ -1435,6 +1435,7 @@ MiddleManagers pass their configurations down to their 
child peons. The MiddleMa
 |`druid.worker.baseTaskDirs`|List of base temporary working directories, one 
of which is assigned per task in a round-robin fashion. This property can be 
used to allow usage of multiple disks for indexing. This property is 
recommended in place of and takes precedence over 
`${druid.indexer.task.baseTaskDir}`.  If this configuration is not set, 
`${druid.indexer.task.baseTaskDir}` is used. For example, 
`druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
 |`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by 
tasks on any single task dir. This value is treated symmetrically across all 
directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then 
each of those task directories is assumed to allow for 500 GB to be used and a 
total of 1.5 TB will potentially be available across all tasks. The actual 
amount of memory assigned to each task is discussed in [Configuring task 
storage 
sizes](../ingestion/tasks.md#configuring-task-storage-sizes)|`Long.MAX_VALUE`|
 |`druid.worker.category`|A string to name the category that the MiddleManager 
node belongs to.|`_default_worker_category`|
+|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This 
config should be set when CentralizedDatasourceSchema feature is enabled. 
|false| 

Review Comment:
   For follow-up PR:
   The config description should be more like `Indicates whether centralized 
schema management is enabled`. The description should also link to the page 
which contains the details of the feature.



##########
docs/operations/metrics.md:
##########
@@ -75,6 +75,12 @@ Most metric values reset each emission period, as specified 
in `druid.monitoring
 |`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch 
datasource schema.||
 |`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch 
datasource schema.||
 |`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch 
datasource schema.||
+|`metadatacache/backfill/count`|Number of segments for which schema was back 
filled in the database.|`dataSource`|
+|`schemacache/realtime/count`|Number of realtime segments for which schema is 
cached.||Depends on the number of realtime segments.|

Review Comment:
   Do these rows render correctly? The preceding rows have only 3 columns, this 
one seems to have 4.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -1056,12 +1136,19 @@ protected boolean tableHasColumn(String tableName, 
String columnName)
    */
   private void validateSegmentsTable()
   {
-    if (tableHasColumn(tablesConfigSupplier.get().getSegmentsTable(), 
"used_status_last_updated")) {
+    String segmentsTables = tablesConfigSupplier.get().getSegmentsTable();

Review Comment:
   **For follow-up PR:**
   ```suggestion
       final String segmentsTables = 
tablesConfigSupplier.get().getSegmentsTable();
   ```



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -927,6 +973,40 @@ public Void withHandle(Handle handle)
     }
   }
 
+  public void createSegmentSchemaTable(final String tableName)
+  {
+    createTable(
+        tableName,
+        ImmutableList.of(
+            StringUtils.format(
+                "CREATE TABLE %1$s (\n"
+                + "  id %2$s NOT NULL,\n"
+                + "  created_date VARCHAR(255) NOT NULL,\n"
+                + "  datasource VARCHAR(255) NOT NULL,\n"
+                + "  fingerprint VARCHAR(255) NOT NULL,\n"
+                + "  payload %3$s NOT NULL,\n"
+                + "  used BOOLEAN NOT NULL,\n"
+                + "  used_status_last_updated VARCHAR(255) NOT NULL,\n"
+                + "  version INTEGER NOT NULL,\n"
+                + "  PRIMARY KEY (id),\n"
+                + "  UNIQUE (fingerprint) \n"
+                + ")",
+                tableName, getSerialType(), getPayloadType()
+            ),
+            StringUtils.format("CREATE INDEX idx_%1$s_fingerprint ON 
%1$s(fingerprint)", tableName),
+            StringUtils.format("CREATE INDEX idx_%1$s_used ON %1$s(used)", 
tableName)

Review Comment:
   **For follow-up PR:**
   While finding schemas to delete, I think we would also use the column 
`used_status_last_updated`. If yes, then that should also be a part of the 
index.



##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.SegmentMetadata;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * In-memory cache of segment schema.
+ * <p>
+ * Internally, mapping of segmentId to segment level information like schemaId 
& numRows is maintained.
+ * This mapping is updated on each database poll {@link 
SegmentSchemaCache#finalizedSegmentSchemaInfo}.
+ * Segment schema created since last DB poll is also fetched and updated in 
the cache {@code finalizedSegmentSchema}.
+ * <p>
+ * Additionally, this class caches schema for realtime segments in {@link 
SegmentSchemaCache#realtimeSegmentSchema}. This mapping
+ * is cleared either when the segment is removed or marked as finalized.
+ * <p>
+ * Finalized segments which do not have their schema information present in 
the DB, fetch their schema via SMQ.
+ * SMQ results are cached in {@link SegmentSchemaCache#inTransitSMQResults}. 
Once the schema information is backfilled
+ * in the DB, it is removed from {@link 
SegmentSchemaCache#inTransitSMQResults} and added to {@link 
SegmentSchemaCache#inTransitSMQPublishedResults}.
+ * {@link SegmentSchemaCache#inTransitSMQPublishedResults} is cleared on each 
successfull DB poll.
+ * <p>
+ * {@link CoordinatorSegmentMetadataCache} uses this cache to fetch schema for 
a segment.
+ * <p>
+ * Schema corresponding to the specified version in {@link 
CentralizedDatasourceSchemaConfig#SCHEMA_VERSION} is cached.
+ */
+@LazySingleton
+public class SegmentSchemaCache
+{
+  private static final Logger log = new Logger(SegmentSchemaCache.class);
+
+  /**
+   * Cache is marked initialized after first DB poll.
+   */
+  private final AtomicReference<CountDownLatch> initialized = new 
AtomicReference<>(new CountDownLatch(1));
+
+  /**
+   * Finalized segment schema information.
+   */
+  private volatile FinalizedSegmentSchemaInfo finalizedSegmentSchemaInfo =
+      new FinalizedSegmentSchemaInfo(ImmutableMap.of(), ImmutableMap.of());
+
+  /**
+   * Schema information for realtime segment. This mapping is updated when 
schema for realtime segment is received.
+   * The mapping is removed when the segment is either removed or marked as 
finalized.
+   */
+  private final ConcurrentMap<SegmentId, SchemaPayloadPlus> 
realtimeSegmentSchema = new ConcurrentHashMap<>();
+
+  /**
+   * If the segment schema is fetched via SMQ, subsequently it is added here.
+   * The mapping is removed when the schema information is backfilled in the 
DB.
+   */
+  private final ConcurrentMap<SegmentId, SchemaPayloadPlus> 
inTransitSMQResults = new ConcurrentHashMap<>();
+
+  /**
+   * Once the schema information is backfilled in the DB, it is added here.
+   * This map is cleared after each DB poll.
+   * After the DB poll and before clearing this map it is possible that some 
results were added to this map.
+   * These results would get lost after clearing this map.
+   * But, it should be fine since the schema could be retrieved if needed 
using SMQ, also the schema would be available in the next poll.
+   */
+  private final ConcurrentMap<SegmentId, SchemaPayloadPlus> 
inTransitSMQPublishedResults = new ConcurrentHashMap<>();
+
+  private final ServiceEmitter emitter;
+
+  @Inject
+  public SegmentSchemaCache(ServiceEmitter emitter)
+  {
+    this.emitter = emitter;
+  }
+
+  public void setInitialized()
+  {
+    if (!isInitialized()) {
+      initialized.get().countDown();
+      log.info("SegmentSchemaCache is initialized.");
+    }
+  }
+
+  /**
+   * This method is called when the current node is no longer the leader.
+   * The schema is cleared except for {@code realtimeSegmentSchemaMap}.
+   * Realtime schema continues to be updated on both the leader and follower 
nodes.
+   */
+  public void onLeaderStop()
+  {
+    initialized.set(new CountDownLatch(1));
+
+    finalizedSegmentSchemaInfo = new 
FinalizedSegmentSchemaInfo(ImmutableMap.of(), ImmutableMap.of());
+    inTransitSMQResults.clear();
+    inTransitSMQPublishedResults.clear();
+  }
+
+  public boolean isInitialized()
+  {
+    return initialized.get().getCount() == 0;
+  }
+
+  /**
+   * {@link CoordinatorSegmentMetadataCache} startup waits on the cache 
initialization.
+   * This is being done to ensure that we don't execute SMQ for segment with 
schema already present in the DB.
+   */
+  public void awaitInitialization() throws InterruptedException
+  {
+    initialized.get().await();
+  }
+
+  /**
+   * This method is called after each DB Poll. It updates reference for 
segment metadata and schema maps.
+   */
+  public void updateFinalizedSegmentSchema(FinalizedSegmentSchemaInfo 
finalizedSegmentSchemaInfo)
+  {
+    this.finalizedSegmentSchemaInfo = finalizedSegmentSchemaInfo;
+    setInitialized();
+  }
+
+  /**
+   * Cache schema for realtime segment. This is cleared when segment is 
published.
+   */
+  public void addRealtimeSegmentSchema(SegmentId segmentId, RowSignature 
rowSignature, long numRows)
+  {
+    realtimeSegmentSchema.put(segmentId, new SchemaPayloadPlus(new 
SchemaPayload(rowSignature), numRows));
+  }
+
+  /**
+   * Cache SMQ result. This entry is cleared when SMQ result is published to 
the DB.
+   */
+  public void addInTransitSMQResult(

Review Comment:
   **For follow-up PR:**
   https://github.com/apache/druid/pull/16044
   There seems to be no place in the code or documentation which seems to 
explain this abbreviation `SMQ`.
   Better to rename this methods to the likes of 
`addTemporaryMetadataQueryResult`.
   Also, `inTransit` is a little confusing as it implies that the result is 
being sent somewhere.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java:
##########
@@ -533,13 +534,13 @@ public TaskStatus getStatus(String taskId)
       }
     }
 
-    public Set<DataSegment> getPublishedSegments(String taskId)
+    public DataSegmentsWithSchemas getPublishedSegments(String taskId)

Review Comment:
   **For follow-up PR**
   ```suggestion
       public DataSegmentsWithSchemas getPublishedSegmentsWithSchemas(String 
taskId)
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java:
##########
@@ -58,7 +61,7 @@ public class InputSourceProcessor
    *
    * @return {@link SegmentsAndCommitMetadata} for the pushed segments.
    */
-  public static SegmentsAndCommitMetadata process(
+  public static Pair<SegmentsAndCommitMetadata, SegmentSchemaMapping> process(

Review Comment:
   **For follow-up PR**
   We should include `SegmentSchemaMapping` inside the 
`SegmentsAndCommitMetadata` itself.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -905,7 +907,7 @@ private TaskStatus generateAndPublishSegments(
     try (final BatchAppenderatorDriver driver = 
BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
       driver.startJob();
 
-      SegmentsAndCommitMetadata pushed = InputSourceProcessor.process(
+      Pair<SegmentsAndCommitMetadata, SegmentSchemaMapping> 
commitMetadataAndSchema = InputSourceProcessor.process(

Review Comment:
   **For follow-up PR**
   Why is `SegmentSchemaMapping` not included inside the 
`SegmentsAndCommitMetadata` object itself?



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -906,6 +996,40 @@ public Void withHandle(Handle handle)
     }
   }
 
+  public void createSegmentSchemaTable(final String tableName)
+  {
+    createTable(
+        tableName,
+        ImmutableList.of(
+            StringUtils.format(
+                "CREATE TABLE %1$s (\n"
+                + "  id %2$s NOT NULL,\n"
+                + "  created_date VARCHAR(255) NOT NULL,\n"

Review Comment:
   The `start`, `end`, `created_date` of `segments` table are also varchar. 
Seems best to stick to it for this table too.



##########
integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java:
##########
@@ -163,4 +163,8 @@ public class TestNGGroup
   public static final String HTTP_ENDPOINT = "http-endpoint";
 
   public static final String CENTRALIZED_DATASOURCE_SCHEMA = 
"centralized-datasource-schema";
+
+  public static final String CDS_TASK_SCHEMA_PUBLISH_DISABLED = 
"cds-task-schema-publish-disabled";
+
+  public static final String CDS_COORDINATOR_SMQ_DISABLED = 
"cds-coordinator-smq-disabled";

Review Comment:
   We might as well just call it `metadata-query`. `SMQ` is not a known or 
documented Druid abbreviation and is bound to lead to confusion.



##########
docs/operations/metrics.md:
##########
@@ -75,6 +75,12 @@ Most metric values reset each emission period, as specified 
in `druid.monitoring
 |`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch 
datasource schema.||
 |`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch 
datasource schema.||
 |`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch 
datasource schema.||
+|`metadatacache/backfill/count`|Number of segments for which schema was back 
filled in the database.|`dataSource`|
+|`schemacache/realtime/count`|Number of realtime segments for which schema is 
cached.||Depends on the number of realtime segments.|
+|`schemacache/finalizedSegmentMetadata/count`|Number of finalized segments for 
which schema metadata is cached.||Depends on the number of segments in the 
cluster.|
+|`schemacache/finalizedSchemaPayload/count`|Number of finalized segment schema 
cached.||Depends on the number of distinct schema in the cluster.|
+|`schemacache/inTransitSMQResults/count`|Number of segments for which schema 
was fetched by executing segment metadata query.||Eventually it should be 0.|
+|`schemacache/inTransitSMQPublishedResults/count`|Number of segments for which 
schema is cached after back filling in the database.||Eventually it should be 
0.|

Review Comment:
   **For follow-up PR:**
   Is `schemacache/` not the same as `metadatacache`? The similar yet different 
names can be confusing.



##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Functions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.timeline.SegmentId;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.TransactionCallback;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Handles segment schema persistence and cleanup.
+ */
+@LazySingleton
+public class SegmentSchemaManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentSchemaManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final SQLMetadataConnector connector;
+
+  @Inject
+  public SegmentSchemaManager(
+      MetadataStorageTablesConfig dbTables,
+      ObjectMapper jsonMapper,
+      SQLMetadataConnector connector
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.connector = connector;
+  }
+
+  /**
+   * Return a list of schema fingerprints
+   */
+  public List<String> findReferencedSchemaMarkedAsUnused()
+  {
+    return connector.retryWithHandle(
+        handle ->
+            handle.createQuery(
+                      StringUtils.format(
+                          "SELECT DISTINCT(schema_fingerprint) FROM %s WHERE 
used = true AND schema_fingerprint IN (SELECT fingerprint FROM %s WHERE used = 
false)",
+                          dbTables.getSegmentsTable(),
+                          dbTables.getSegmentSchemasTable()
+                      ))
+                  .mapTo(String.class)
+                  .list()
+    );
+  }
+
+  public int markSchemaAsUsed(List<String> schemaFingerprints)
+  {
+    if (schemaFingerprints.isEmpty()) {
+      return 0;
+    }
+    String inClause = getInClause(schemaFingerprints.stream());
+
+    return connector.retryWithHandle(
+        handle ->
+            handle.createStatement(
+                      StringUtils.format(
+                          "UPDATE %s SET used = true, used_status_last_updated 
= :now"
+                          + " WHERE fingerprint IN (%s)",
+                          dbTables.getSegmentSchemasTable(), inClause
+                      )
+                  )
+                  .bind("now", DateTimes.nowUtc().toString())
+                  .execute()
+    );
+  }
+
+  public int deleteSchemasOlderThan(long timestamp)
+  {
+    return connector.retryWithHandle(
+        handle -> handle.createStatement(
+                            StringUtils.format(
+                                "DELETE FROM %s WHERE used = false AND 
used_status_last_updated < :now",
+                                dbTables.getSegmentSchemasTable()
+                            ))
+                        .bind("now", DateTimes.utc(timestamp).toString())
+                        .execute());
+  }
+
+  public int markUnreferencedSchemasAsUnused()
+  {
+    return connector.retryWithHandle(
+        handle ->
+            handle.createStatement(
+                      StringUtils.format(
+                          "UPDATE %s SET used = false, 
used_status_last_updated = :now  WHERE used != false "
+                          + "AND fingerprint NOT IN (SELECT 
DISTINCT(schema_fingerprint) FROM %s WHERE used=true AND schema_fingerprint IS 
NOT NULL)",
+                          dbTables.getSegmentSchemasTable(),
+                          dbTables.getSegmentsTable()
+                      )
+                  )
+                  .bind("now", DateTimes.nowUtc().toString())
+                  .execute());
+  }
+
+  /**
+   * Persist segment schema and update segments in a transaction.
+   */
+  public void persistSchemaAndUpdateSegmentsTable(
+      final String dataSource,
+      final List<SegmentSchemaMetadataPlus> segmentSchemas,
+      final int version
+  )
+  {
+    connector.retryTransaction((TransactionCallback<Void>) (handle, status) -> 
{
+      Map<String, SchemaPayload> schemaPayloadMap = new HashMap<>();
+
+      for (SegmentSchemaMetadataPlus segmentSchema : segmentSchemas) {
+        schemaPayloadMap.put(
+            segmentSchema.getFingerprint(),
+            segmentSchema.getSegmentSchemaMetadata().getSchemaPayload()
+        );
+      }
+      persistSegmentSchema(handle, dataSource, version, schemaPayloadMap);
+      updateSegmentWithSchemaInformation(handle, segmentSchemas);
+
+      return null;
+    }, 1, 3);
+  }
+
+  /**
+   * Persist unique segment schema in the DB.
+   */
+  public void persistSegmentSchema(
+      final Handle handle,
+      final String dataSource,
+      final int version,
+      final Map<String, SchemaPayload> fingerprintSchemaPayloadMap
+  ) throws JsonProcessingException
+  {
+    // Filter already existing schema
+    Map<Boolean, Set<String>> existingFingerprintsAndUsedStatus = 
fingerprintExistBatch(
+        handle,
+        fingerprintSchemaPayloadMap.keySet()
+    );
+
+    // Used schema can also be marked as unused by the schema cleanup duty in 
parallel.
+    // Refer to the javadocs in 
org.apache.druid.server.coordinator.duty.KillUnreferencedSegmentSchemaDuty for 
more details.
+    Set<String> usedExistingFingerprints = 
existingFingerprintsAndUsedStatus.containsKey(true)
+                                           ? 
existingFingerprintsAndUsedStatus.get(true)
+                                           : new HashSet<>();
+    Set<String> unusedExistingFingerprints = 
existingFingerprintsAndUsedStatus.containsKey(false)
+                                             ? 
existingFingerprintsAndUsedStatus.get(false)
+                                             : new HashSet<>();
+    Set<String> existingFingerprints = Sets.union(usedExistingFingerprints, 
unusedExistingFingerprints);
+    if (existingFingerprints.size() > 0) {
+      log.info(
+          "Found already existing schema in the DB for dataSource [%1$s]. "
+          + "Used fingeprints: [%2$s], Unused fingerprints: [%3$s].",
+          dataSource,
+          usedExistingFingerprints,
+          unusedExistingFingerprints
+      );
+    }
+
+    // Unused schema can be deleted by the schema cleanup duty in parallel.
+    // Refer to the javadocs in 
org.apache.druid.server.coordinator.duty.KillUnreferencedSegmentSchemaDuty for 
more details.
+    if (unusedExistingFingerprints.size() > 0) {
+      // make the unused schema as used to prevent deletion
+      String inClause = getInClause(unusedExistingFingerprints.stream());
+
+      handle.createStatement(
+                StringUtils.format(
+                    "UPDATE %s SET used = true, used_status_last_updated = 
:now"
+                    + " WHERE fingerprint IN (%s)",
+                    dbTables.getSegmentSchemasTable(), inClause
+                )
+            )
+            .bind("now", DateTimes.nowUtc().toString())
+            .execute();
+    }
+
+    Map<String, SchemaPayload> schemaPayloadToCreate = new HashMap<>();
+
+    for (Map.Entry<String, SchemaPayload> entry : 
fingerprintSchemaPayloadMap.entrySet()) {
+      if (!existingFingerprints.contains(entry.getKey())) {
+        schemaPayloadToCreate.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    if (schemaPayloadToCreate.isEmpty()) {
+      log.info("No schema to persist for dataSource [%s] and version [%s].", 
dataSource, version);
+      return;
+    }
+
+    final List<List<String>> partitionedFingerprints = Lists.partition(
+        new ArrayList<>(schemaPayloadToCreate.keySet()),
+        DB_ACTION_PARTITION_SIZE
+    );
+
+    String insertSql = StringUtils.format(
+        "INSERT INTO %s (created_date, datasource, fingerprint, payload, used, 
used_status_last_updated, version) "
+        + "VALUES (:created_date, :datasource, :fingerprint, :payload, :used, 
:used_status_last_updated, :version)",
+        dbTables.getSegmentSchemasTable()
+    );
+
+    // insert schemas
+    PreparedBatch schemaInsertBatch = handle.prepareBatch(insertSql);
+    for (List<String> partition : partitionedFingerprints) {
+      for (String fingerprint : partition) {
+        final String now = DateTimes.nowUtc().toString();
+        schemaInsertBatch.add()
+                         .bind("created_date", now)
+                         .bind("datasource", dataSource)
+                         .bind("fingerprint", fingerprint)
+                         .bind("payload", 
jsonMapper.writeValueAsBytes(fingerprintSchemaPayloadMap.get(fingerprint)))
+                         .bind("used", true)
+                         .bind("used_status_last_updated", now)
+                         .bind("version", version);
+      }
+      final int[] affectedRows = schemaInsertBatch.execute();
+      final List<String> failedInserts = new ArrayList<>();
+      for (int i = 0; i < partition.size(); ++i) {
+        if (affectedRows[i] != 1) {
+          failedInserts.add(partition.get(i));
+        }
+      }
+      if (failedInserts.isEmpty()) {
+        log.info(
+            "Published schemas [%s] to DB for datasource [%s] and version 
[%s]",
+            partition,
+            dataSource,
+            version
+        );
+      } else {
+        throw new ISE(
+            "Failed to publish schemas [%s] to DB for datasource [%s] and 
version [%s]",
+            failedInserts,
+            dataSource,
+            version
+        );
+      }
+    }
+  }
+
+  /**
+   * Update segment with schemaId and numRows information.
+   */
+  public void updateSegmentWithSchemaInformation(
+      final Handle handle,
+      final List<SegmentSchemaMetadataPlus> batch
+  )
+  {
+    log.debug("Updating segment with schema and numRows information: [%s].", 
batch);
+
+    // update schemaId and numRows in segments table

Review Comment:
   ```suggestion
       // update fingerprint and numRows in segments table
   ```



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -540,25 +560,51 @@ tableName, getSerialType(), getPayloadType()
   }
 
   /**
-   * Adds the used_status_last_updated column to the "segments" table.
+   * Adds new columns (used_status_last_updated) to the "segments" table.
+   * Conditionally, add schema_fingerprint, num_rows columns.
    */
-  protected void alterSegmentTableAddUsedFlagLastUpdated()
+  protected void alterSegmentTable()
   {
     final String tableName = tablesConfigSupplier.get().getSegmentsTable();
-    if (tableHasColumn(tableName, "used_status_last_updated")) {
-      log.info("Table[%s] already has column[used_status_last_updated].", 
tableName);
-    } else {
-      log.info("Adding column[used_status_last_updated] to table[%s].", 
tableName);
-      alterTable(
-          tableName,
-          ImmutableList.of(
-              StringUtils.format(
-                  "ALTER TABLE %1$s ADD used_status_last_updated varchar(255)",
-                  tableName
-              )
-          )
-      );
+
+    Map<String, String> columnNameTypes = new HashMap<>();
+    columnNameTypes.put("used_status_last_updated", "VARCHAR(255)");
+
+    if (centralizedDatasourceSchemaConfig.isEnabled()) {
+      columnNameTypes.put("schema_fingerprint", "VARCHAR(255)");
+      columnNameTypes.put("num_rows", "BIGINT");
+    }
+
+    Set<String> columnsToAdd = new HashSet<>();

Review Comment:
   **For follow-up PR:**
   Why build a `columnsToAdd` list and then an `alterCommands` list? Why not 
build the `alterCommands` directly?



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -312,23 +318,37 @@ tableName, getPayloadType(), getCollation()
 
   public void createSegmentTable(final String tableName)
   {
+    List<String> columns = new ArrayList<>();
+    columns.add("id VARCHAR(255) NOT NULL");
+    columns.add("dataSource VARCHAR(255) %4$s NOT NULL");
+    columns.add("created_date VARCHAR(255) NOT NULL");
+    columns.add("start VARCHAR(255) NOT NULL");
+    columns.add("%3$send%3$s VARCHAR(255) NOT NULL");
+    columns.add("partitioned BOOLEAN NOT NULL");
+    columns.add("version VARCHAR(255) NOT NULL");
+    columns.add("used BOOLEAN NOT NULL");
+    columns.add("payload %2$s NOT NULL");
+    columns.add("used_status_last_updated VARCHAR(255) NOT NULL");
+
+    if (centralizedDatasourceSchemaConfig.isEnabled()) {
+      columns.add("schema_fingerprint VARCHAR(255)");
+      columns.add("num_rows BIGINT");
+    }
+
+    StringBuilder createStatementBuilder = new StringBuilder("CREATE TABLE 
%1$s (");
+
+    for (String column : columns) {
+      createStatementBuilder.append(column);
+      createStatementBuilder.append(",");

Review Comment:
   **For follow-up PR:**
   Nit: We seem to have removed the new line characters. They formatted the 
statement nicely in case we wanted to debug it.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -1056,12 +1136,19 @@ protected boolean tableHasColumn(String tableName, 
String columnName)
    */
   private void validateSegmentsTable()
   {
-    if (tableHasColumn(tablesConfigSupplier.get().getSegmentsTable(), 
"used_status_last_updated")) {
+    String segmentsTables = tablesConfigSupplier.get().getSegmentsTable();
+
+    boolean schemaPersistenceRequirementMet =

Review Comment:
   **For follow-up PR:**
   Please rename this to something more suggestive of what it checks.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java:
##########
@@ -1016,10 +1018,13 @@ private TaskToolboxFactory createToolboxFactory(
       TaskActionClientFactory taskActionClientFactory
   )
   {
+    CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig = new 
CentralizedDatasourceSchemaConfig();
+    centralizedDatasourceSchemaConfig.setEnabled(true);
     TestTaskToolboxFactory.Builder builder = new 
TestTaskToolboxFactory.Builder()
         .setConfig(taskConfig)
         .setIndexIO(new IndexIO(getObjectMapper(), ColumnConfig.DEFAULT))
-        .setTaskActionClientFactory(taskActionClientFactory);
+        .setTaskActionClientFactory(taskActionClientFactory)
+        .setCentralizedTableSchemaConfig(centralizedDatasourceSchemaConfig);

Review Comment:
   **For follow-up PR**
   Is this needed for these tests?



##########
extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java:
##########
@@ -64,10 +65,11 @@ public PostgreSQLConnector(
       Supplier<MetadataStorageConnectorConfig> config,
       Supplier<MetadataStorageTablesConfig> dbTables,
       PostgreSQLConnectorConfig connectorConfig,
-      PostgreSQLTablesConfig tablesConfig
+      PostgreSQLTablesConfig tablesConfig,
+      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig

Review Comment:
   **For follow-up PR**
   Is this config bound to a non-null value even when the feature is disabled?



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -540,25 +560,51 @@ tableName, getSerialType(), getPayloadType()
   }
 
   /**
-   * Adds the used_status_last_updated column to the "segments" table.
+   * Adds new columns (used_status_last_updated) to the "segments" table.
+   * Conditionally, add schema_fingerprint, num_rows columns.
    */
-  protected void alterSegmentTableAddUsedFlagLastUpdated()
+  protected void alterSegmentTable()
   {
     final String tableName = tablesConfigSupplier.get().getSegmentsTable();
-    if (tableHasColumn(tableName, "used_status_last_updated")) {
-      log.info("Table[%s] already has column[used_status_last_updated].", 
tableName);
-    } else {
-      log.info("Adding column[used_status_last_updated] to table[%s].", 
tableName);
-      alterTable(
-          tableName,
-          ImmutableList.of(
-              StringUtils.format(
-                  "ALTER TABLE %1$s ADD used_status_last_updated varchar(255)",
-                  tableName
-              )
-          )
-      );
+
+    Map<String, String> columnNameTypes = new HashMap<>();
+    columnNameTypes.put("used_status_last_updated", "VARCHAR(255)");
+
+    if (centralizedDatasourceSchemaConfig.isEnabled()) {
+      columnNameTypes.put("schema_fingerprint", "VARCHAR(255)");
+      columnNameTypes.put("num_rows", "BIGINT");
+    }
+
+    Set<String> columnsToAdd = new HashSet<>();
+
+    for (String columnName : columnNameTypes.keySet()) {
+      if (tableHasColumn(tableName, columnName)) {
+        log.info("Table[%s] already has column[%s].", tableName, columnName);
+      } else {
+        columnsToAdd.add(columnName);
+      }
+    }
+
+    List<String> alterCommands = new ArrayList<>();
+    if (!columnsToAdd.isEmpty()) {
+      for (String columnName : columnsToAdd) {
+        alterCommands.add(
+            StringUtils.format(
+                "ALTER TABLE %1$s ADD %2$s %3$s",
+                tableName,
+                columnName,
+                columnNameTypes.get(columnName)
+            )
+        );
+      }
+
+      log.info("Adding columns %s to table[%s].", columnsToAdd, tableName);
     }
+
+    alterTable(
+        tableName,
+        alterCommands
+    );

Review Comment:
   **For follow-up PR:**
   Nit: Please don't break lines unless needed. Method arguments need not 
always be on separate lines. In fact, Druid style prefers them to be on the 
same line unless they cross the limit of 80 chars (optionally) or 120 chars 
(definitely break then).



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java:
##########
@@ -664,7 +665,7 @@ public ListenableFuture<TaskStatusResponse> 
taskStatus(String taskId)
       }
     }
 
-    public Set<DataSegment> getPublishedSegments(Task task)
+    public DataSegmentsWithSchemas getSegmentAndSchemas(Task task)

Review Comment:
   **For follow-up PR**
   ```suggestion
       public DataSegmentsWithSchemas getPublishedSegmentsWithSchemas(Task task)
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java:
##########
@@ -509,6 +548,21 @@ public Map<String, Long> getBlacklistedTaskSlotCount()
     }
   }
 
+  public void verifySchema(DataSegmentsWithSchemas dataSegmentsWithSchemas)

Review Comment:
   **For follow-up PR**
   What does it verify? Maybe add a javadoc.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDuty.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.metadata.SegmentSchemaManager;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
+
+import java.util.List;
+
+/**
+ * Coordinator duty to clean up segment schema which are not referenced by any 
used segment.
+ * <p>
+ * <ol>
+ * <li>If a schema is not referenced, UPDATE schemas SET used = false, 
used_status_last_updated = now</li>
+ * <li>DELETE FROM schemas WHERE used = false AND used_status_last_updated < 6 
hours ago</li>
+ * <li>When creating a new segment, try to find schema for the fingerprint of 
the segment.</li>
+ *    <ol type="a">
+ *    <li> If no record found, create a new one.</li>
+ *    <li> If record found which has used = true, reuse this schema_id.</li>
+ *    <li> If record found which has used = false, UPDATE SET used = true, 
used_status_last_updated = now</li>
+ *    </ol>
+ * </ol>
+ * </p>
+ * <p>
+ * Possible race conditions:
+ *    <ol type="a">
+ *    <li> Between ops 1 and 3b: In other words, we might end up with a 
segment that points to a schema that has just been marked as unused. This is 
repaired by the coordinator duty.</li>
+ *    <li> Between 2 and 3c: This can be handled. Either 2 will fail to update 
any rows (good case) or 3c will fail to update any rows (bad case). In the bad 
case, we need to recreate the schema, same as step 3a. </li>
+ *    </ol>
+ * </p>
+ */
+public class KillUnreferencedSegmentSchemaDuty extends MetadataCleanupDuty

Review Comment:
   **For follow-up PR**
   Please rename to `KillUnreferencedSegmentSchemas` for uniformity with the 
other coordinator duties.



##########
server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import com.google.common.primitives.Ints;
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.SchemaPayload;
+
+import java.io.IOException;
+
+/**
+ * Utility to generate fingerprint for an object.
+ */
+@LazySingleton
+public class FingerprintGenerator
+{
+  private static final Logger log = new Logger(FingerprintGenerator.class);
+
+  private final ObjectMapper objectMapper;
+
+  @Inject
+  public FingerprintGenerator(ObjectMapper objectMapper)
+  {
+    this.objectMapper = objectMapper;
+  }
+
+  /**
+   * Generates fingerprint or hash string for an object using SHA-256 hash 
algorithm.
+   */
+  @SuppressWarnings("UnstableApiUsage")
+  public String generateFingerprint(SchemaPayload schemaPayload, String 
dataSource, int version)
+  {
+    try {
+      final Hasher hasher = Hashing.sha256().newHasher();
+
+      hasher.putBytes(objectMapper.writeValueAsBytes(schemaPayload));
+      hasher.putBytes(StringUtils.toUtf8(dataSource));

Review Comment:
   **For follow-up PR**
   Maybe include a delimiter in between the same way as is being done in 
`PendingSegmentRecord`.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java:
##########
@@ -76,7 +76,7 @@ public void publishSegment(final DataSegment segment) throws 
IOException
     );
   }
 
-  private void publishSegment(
+  void publishSegment(

Review Comment:
   **For follow-up PR:**
   I went through a bit of trouble (#16044) to make this private. 😅 
   Let's not make it visible if it's only for testing. Find an alternate 
approach for verifying the changes.
   Please try to avoid exposing private methods as it really breaks OOP.
   



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java:
##########
@@ -417,12 +419,16 @@ public SegmentPublishResult publishAnnotatedSegments(
         );
         final DataSourceMetadata endMetadata = 
runner.createDataSourceMetadata(finalPartitions);
         action = taskLockType == TaskLockType.APPEND
-                 ? 
SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, 
startMetadata, endMetadata)
-                 : 
SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, 
endMetadata);
+                 ? 
SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, 
startMetadata, endMetadata,
+                                                                           
segmentSchemaMapping
+        )
+                 : 
SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, 
endMetadata,
+                                                                 
segmentSchemaMapping
+                 );

Review Comment:
   **For follow-up PR**
   Please fix the formatting here.



##########
processing/src/main/java/org/apache/druid/segment/DataSegmentWithSchema.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+
+/**
+ * Immutable wrapper class for segment and schema.
+ */
+public class DataSegmentWithSchema

Review Comment:
   **For follow-up PR**
   Rename to `DataSegmentWithMetadata` as it doesn't just have schema?



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java:
##########
@@ -65,7 +65,7 @@ public static void setup() throws IOException
     
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), 
UNUSED_V1));
 
     actionTestKit.getMetadataStorageCoordinator()
-                 .commitSegments(expectedUnusedSegments);
+                 .commitSegments(expectedUnusedSegments, null);

Review Comment:
   **For follow-up PR**
   Since passing `null` is a very common usage right now, it would be better to 
keep two variants of the new  methods. It would be easier to identify the 
usages which pass non-null values and we could also avoid passing nulls all 
over the place.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java:
##########
@@ -110,4 +115,14 @@ public TaskStatus runTask(TaskToolbox toolbox)
   {
     return status;
   }
+
+  public TaskAction<SegmentPublishResult> testBuildPublishAction(

Review Comment:
   **For follow-up PR**
   ```suggestion
     public TaskAction<SegmentPublishResult> buildPublishAction(
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java:
##########
@@ -192,7 +193,8 @@ public Pair<Integer, ReadableInput> apply(ReadableInput 
readableInput)
                   frameContext.indexMerger(),
                   meters,
                   parseExceptionHandler,
-                  true
+                  true,
+                  CentralizedDatasourceSchemaConfig.create(false)

Review Comment:
   Better to have this comment in the javadoc of 
`CentralizedDatasourceSchemaConfig` itself.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java:
##########
@@ -433,16 +434,19 @@ private boolean 
tryLockWithDetermineResult(TaskActionClient client, LockGranular
   protected TaskAction<SegmentPublishResult> buildPublishAction(
       Set<DataSegment> segmentsToBeOverwritten,
       Set<DataSegment> segmentsToPublish,
+      SegmentSchemaMapping segmentSchemaMapping,
       TaskLockType lockType
   )
   {
     switch (lockType) {
       case REPLACE:
-        return SegmentTransactionalReplaceAction.create(segmentsToPublish);
+        return SegmentTransactionalReplaceAction.create(segmentsToPublish, 
segmentSchemaMapping);
       case APPEND:
-        return SegmentTransactionalAppendAction.forSegments(segmentsToPublish);
+        return SegmentTransactionalAppendAction.forSegments(segmentsToPublish, 
segmentSchemaMapping);
       default:
-        return 
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, 
segmentsToPublish);
+        return 
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, 
segmentsToPublish,

Review Comment:
   **For follow-up PR**
   Please fix formatting here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java:
##########
@@ -68,22 +69,28 @@ public class SegmentTransactionalInsertAction implements 
TaskAction<SegmentPubli
   private final DataSourceMetadata endMetadata;
   @Nullable
   private final String dataSource;
+  @Nullable
+  private final SegmentSchemaMapping segmentSchemaMapping;
 
   public static SegmentTransactionalInsertAction overwriteAction(
       @Nullable Set<DataSegment> segmentsToBeOverwritten,
-      Set<DataSegment> segmentsToPublish
+      Set<DataSegment> segmentsToPublish,
+      @Nullable SegmentSchemaMapping segmentSchemaMapping
   )
   {
-    return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, 
segmentsToPublish, null, null, null);
+    return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, 
segmentsToPublish, null, null, null,

Review Comment:
   **For follow-up PR**
   Please fix formatting here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to