cryptoe commented on code in PR #15817:
URL: https://github.com/apache/druid/pull/15817#discussion_r1565436627


##########
server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java:
##########
@@ -217,4 +217,6 @@ List<Interval> getUnusedSegmentIntervals(
   void populateUsedFlagLastUpdatedAsync();
 
   void stopAsyncUsedFlagLastUpdatedUpdate();
+

Review Comment:
   Please add java docs to the method. 
   And the method name should be refreshSchema. 



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1088,6 +1123,174 @@ public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLE
     );
   }
 
+  private void doPollSegmentAndSchema()
+  {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    log.info("Starting polling of segment and schema table. latestSchemaId is 
[%s].", latestSchemaId.get());
+
+    ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats = 
new ConcurrentHashMap<>();

Review Comment:
   This can be a immutableMap as well since we do not have multiple writers. 



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1088,6 +1123,174 @@ public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLE
     );
   }
 
+  private void doPollSegmentAndSchema()
+  {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    log.info("Starting polling of segment and schema table. latestSchemaId is 
[%s].", latestSchemaId.get());
+
+    ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats = 
new ConcurrentHashMap<>();
+
+    // some databases such as PostgreSQL require auto-commit turned off
+    // to stream results back, enabling transactions disables auto-commit
+    //
+    // setting connection to read-only will allow some database such as MySQL
+    // to automatically use read-only transaction mode, further optimizing the 
query
+    final List<DataSegment> segments = connector.inReadOnlyTransaction(
+        new TransactionCallback<List<DataSegment>>()
+        {
+          @Override
+          public List<DataSegment> inTransaction(Handle handle, 
TransactionStatus status)
+          {
+            return handle
+                .createQuery(StringUtils.format("SELECT payload, schema_id, 
num_rows FROM %s WHERE used=true", getSegmentsTable()))
+                .setFetchSize(connector.getStreamingFetchSize())
+                .map(
+                    new ResultSetMapper<DataSegment>()
+                    {
+                      @Override
+                      public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLException
+                      {
+                        try {
+                          DataSegment segment = 
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+                          segmentStats.put(
+                              segment.getId(),
+                              new SegmentSchemaCache.SegmentStats(
+                                  (Long) r.getObject("schema_id"),
+                                  (Long) r.getObject("num_rows")
+                              )
+                          );
+                          return replaceWithExistingSegmentIfPresent(segment);
+                        }
+                        catch (IOException e) {
+                          log.makeAlert(e, "Failed to read segment from 
db.").emit();
+                          // If one entry in database is corrupted doPoll() 
should continue to work overall. See
+                          // filter by `Objects::nonNull` below in this method.
+                          return null;
+                        }
+                      }
+                    }
+                )
+                .list();
+          }
+        }
+    );
+
+    ConcurrentMap<Long, SchemaPayload> schemaMap = new ConcurrentHashMap<>();
+
+    String schemaPollQuery;
+
+    Long lastSchemaIdPrePoll = latestSchemaId.get();
+    if (lastSchemaIdPrePoll == null) {
+      log.info("Executing full schema refresh.");
+      schemaPollQuery =
+          StringUtils.format(
+              "SELECT id, payload FROM %s WHERE version = '%s'",
+              getSegmentSchemaTable(),
+              CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+          );
+    } else {
+      schemaPollQuery = StringUtils.format(
+          "SELECT id, payload FROM %s WHERE version = '%s' AND id > %s",
+          getSegmentSchemaTable(),
+          CentralizedDatasourceSchemaConfig.SCHEMA_VERSION,
+          lastSchemaIdPrePoll
+      );
+    }
+    String finalSchemaPollQuery = schemaPollQuery;
+
+    final AtomicReference<Long> maxPolledId = new AtomicReference<>();
+    maxPolledId.set(lastSchemaIdPrePoll);
+
+    connector.inReadOnlyTransaction(new TransactionCallback<Object>()
+    {
+      @Override
+      public Object inTransaction(Handle handle, TransactionStatus status)
+      {
+        return handle.createQuery(finalSchemaPollQuery)
+                     .setFetchSize(connector.getStreamingFetchSize())
+                     .map(new ResultSetMapper<Void>()
+                     {
+                       @Override
+                       public Void map(int index, ResultSet r, 
StatementContext ctx) throws SQLException
+                       {
+                         try {
+                           long id = r.getLong("id");
+
+                           if (maxPolledId.get() == null || id > 
maxPolledId.get()) {
+                             maxPolledId.set(id);
+                           }
+
+                           schemaMap.put(
+                               r.getLong("id"),
+                               jsonMapper.readValue(r.getBytes("payload"), 
SchemaPayload.class)
+                           );
+                         }
+                         catch (IOException e) {
+                           log.makeAlert(e, "Failed to read schema from 
db.").emit();
+                         }
+                         return null;
+                       }
+                     }).list();
+      }
+    });
+
+    log.debug("SchemaMap polled from the database is [%s]", schemaMap);
+
+    if (lastSchemaIdPrePoll == null) {
+      // full refresh
+      segmentSchemaCache.updateFinalizedSegmentSchemaReference(schemaMap);
+    } else {
+      // delta update
+      schemaMap.forEach(segmentSchemaCache::addFinalizedSegmentSchema);
+    }
+    segmentSchemaCache.updateFinalizedSegmentStatsReference(segmentStats);
+    segmentSchemaCache.resetInTransitSMQResultPublishedOnDBPoll();

Review Comment:
   There can be a race between 1235-1247. 
   Should we push this inside the transaction lock ?



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1088,6 +1123,174 @@ public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLE
     );
   }
 
+  private void doPollSegmentAndSchema()
+  {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    log.info("Starting polling of segment and schema table. latestSchemaId is 
[%s].", latestSchemaId.get());
+
+    ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats = 
new ConcurrentHashMap<>();
+
+    // some databases such as PostgreSQL require auto-commit turned off
+    // to stream results back, enabling transactions disables auto-commit
+    //
+    // setting connection to read-only will allow some database such as MySQL
+    // to automatically use read-only transaction mode, further optimizing the 
query
+    final List<DataSegment> segments = connector.inReadOnlyTransaction(
+        new TransactionCallback<List<DataSegment>>()
+        {
+          @Override
+          public List<DataSegment> inTransaction(Handle handle, 
TransactionStatus status)
+          {
+            return handle
+                .createQuery(StringUtils.format("SELECT payload, schema_id, 
num_rows FROM %s WHERE used=true", getSegmentsTable()))
+                .setFetchSize(connector.getStreamingFetchSize())
+                .map(
+                    new ResultSetMapper<DataSegment>()
+                    {
+                      @Override
+                      public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLException
+                      {
+                        try {
+                          DataSegment segment = 
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+                          segmentStats.put(
+                              segment.getId(),
+                              new SegmentSchemaCache.SegmentStats(
+                                  (Long) r.getObject("schema_id"),
+                                  (Long) r.getObject("num_rows")
+                              )
+                          );
+                          return replaceWithExistingSegmentIfPresent(segment);
+                        }
+                        catch (IOException e) {
+                          log.makeAlert(e, "Failed to read segment from 
db.").emit();
+                          // If one entry in database is corrupted doPoll() 
should continue to work overall. See
+                          // filter by `Objects::nonNull` below in this method.
+                          return null;
+                        }
+                      }
+                    }
+                )
+                .list();
+          }
+        }
+    );
+
+    ConcurrentMap<Long, SchemaPayload> schemaMap = new ConcurrentHashMap<>();
+
+    String schemaPollQuery;
+
+    Long lastSchemaIdPrePoll = latestSchemaId.get();
+    if (lastSchemaIdPrePoll == null) {
+      log.info("Executing full schema refresh.");
+      schemaPollQuery =
+          StringUtils.format(
+              "SELECT id, payload FROM %s WHERE version = '%s'",
+              getSegmentSchemaTable(),
+              CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+          );
+    } else {
+      schemaPollQuery = StringUtils.format(
+          "SELECT id, payload FROM %s WHERE version = '%s' AND id > %s",
+          getSegmentSchemaTable(),
+          CentralizedDatasourceSchemaConfig.SCHEMA_VERSION,
+          lastSchemaIdPrePoll
+      );
+    }
+    String finalSchemaPollQuery = schemaPollQuery;

Review Comment:
   Nit: no need for this variable assignment. 



##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -122,15 +148,180 @@ public ServerView.CallbackAction serverSegmentRemoved(
           @Override
           public ServerView.CallbackAction 
segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
           {
-            if (realtimeSegmentSchemaAnnouncement) {
-              updateSchemaForSegments(segmentSchemas);
-            }
+            updateSchemaForRealtimeSegments(segmentSchemas);
             return ServerView.CallbackAction.CONTINUE;
           }
         }
     );
   }
 
+  @LifecycleStart
+  @Override
+  public void start() throws InterruptedException
+  {
+    // noop, refresh is started only on leader node
+  }
+
+  @LifecycleStop
+  @Override
+  public void stop()
+  {
+    callbackExec.shutdownNow();
+    cacheExec.shutdownNow();
+    segmentSchemaCache.uninitialize();
+    segmentSchemaBackfillQueue.leaderStop();
+    cacheExecFuture = null;
+  }
+
+  public void leaderStart()
+  {
+    log.info("%s starting cache initialization.", getClass().getSimpleName());
+    try {
+      segmentSchemaBackfillQueue.leaderStart();
+      cacheExecFuture = cacheExec.submit(this::cacheExecLoop);
+      if (config.isAwaitInitializationOnStart()) {
+        awaitInitialization();
+      }
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void leaderStop()
+  {
+    log.info("%s stopping cache.", getClass().getSimpleName());
+    cacheExecFuture.cancel(true);
+    segmentSchemaCache.uninitialize();
+    segmentSchemaBackfillQueue.leaderStop();
+  }
+
+  /**
+   * This method ensures that the refresh goes through only when schemaCache 
is initialized.
+   */
+  @Override
+  public synchronized void refreshWaitCondition() throws InterruptedException
+  {
+    segmentSchemaCache.awaitInitialization();
+  }
+
+  @Override
+  protected void unmarkSegmentAsMutable(SegmentId segmentId)
+  {
+    synchronized (lock) {
+      log.debug("SegmentId [%s] is marked as finalized.", segmentId);
+      mutableSegments.remove(segmentId);
+      // remove it from the realtime schema cache
+      segmentSchemaCache.realtimeSegmentRemoved(segmentId);
+    }
+  }
+
+  @Override
+  protected void removeSegmentAction(SegmentId segmentId)
+  {
+    log.debug("SegmentId [%s] is removed.", segmentId);
+    segmentSchemaCache.segmentRemoved(segmentId);
+  }
+
+  @Override
+  protected boolean smqAction(
+      String dataSource,
+      SegmentId segmentId,
+      RowSignature rowSignature,
+      SegmentAnalysis analysis
+  )
+  {
+    AtomicBoolean added = new AtomicBoolean(false);
+    segmentMetadataInfo.compute(
+        dataSource,
+        (datasourceKey, dataSourceSegments) -> {
+          if (dataSourceSegments == null) {
+            // Datasource may have been removed or become unavailable while 
this refresh was ongoing.
+            log.warn(
+                "No segment map found with datasource [%s], skipping refresh 
of segment [%s]",
+                datasourceKey,
+                segmentId
+            );
+            return null;
+          } else {
+            dataSourceSegments.compute(
+                segmentId,
+                (segmentIdKey, segmentMetadata) -> {
+                  if (segmentMetadata == null) {
+                    log.warn("No segment [%s] found, skipping refresh", 
segmentId);
+                    return null;
+                  } else {
+                    long numRows = analysis.getNumRows();
+                    log.debug("Publishing segment schema. SegmentId [%s], 
RowSignature [%s], numRows [%d]", segmentId, rowSignature, numRows);
+                    Map<String, AggregatorFactory> aggregators = 
analysis.getAggregators();
+                    // cache the signature
+                    segmentSchemaCache.addInTransitSMQResult(segmentId, 
rowSignature, numRows);

Review Comment:
   Should this also have Aggregator factory. 



##########
server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java:
##########
@@ -800,6 +757,62 @@ private Set<SegmentId> refreshSegmentsForDataSource(final 
String dataSource, fin
     return retVal;
   }
 
+  /**
+   * Action to be executed on the result of Segment metadata query.
+   * Returns if the segment metadata was updated.
+   */
+  protected boolean smqAction(
+      String dataSource,
+      SegmentId segmentId,
+      RowSignature rowSignature,
+      SegmentAnalysis analysis
+  )
+  {
+    log.info("Executing smq action.");

Review Comment:
   This log line does not add any value. 



##########
server/src/main/java/org/apache/druid/segment/metadata/CentralizedDatasourceSchemaConfig.java:
##########
@@ -27,38 +27,72 @@
  */
 public class CentralizedDatasourceSchemaConfig
 {
+  public static final String PROPERTY_PREFIX = 
"druid.centralizedDatasourceSchema";
+
+  public static final String SCHEMA_VERSION = "v1";
+
   @JsonProperty
   private boolean enabled = false;
+  @JsonProperty
+  private boolean backFillEnabled = true;

Review Comment:
   This should be moved in testing block 



##########
server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java:
##########
@@ -711,7 +702,7 @@ private Set<SegmentId> refreshSegmentsForDataSource(final 
String dataSource, fin
       throw new ISE("'segments' must all match 'dataSource'!");
     }
 
-    log.debug("Refreshing metadata for datasource[%s].", dataSource);
+    log.info("Refreshing metadata for datasource[%s].", dataSource);

Review Comment:
   Lets change the debug to info. 



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1088,6 +1123,174 @@ public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLE
     );
   }
 
+  private void doPollSegmentAndSchema()
+  {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    log.info("Starting polling of segment and schema table. latestSchemaId is 
[%s].", latestSchemaId.get());
+
+    ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats = 
new ConcurrentHashMap<>();
+
+    // some databases such as PostgreSQL require auto-commit turned off
+    // to stream results back, enabling transactions disables auto-commit
+    //
+    // setting connection to read-only will allow some database such as MySQL
+    // to automatically use read-only transaction mode, further optimizing the 
query
+    final List<DataSegment> segments = connector.inReadOnlyTransaction(
+        new TransactionCallback<List<DataSegment>>()
+        {
+          @Override
+          public List<DataSegment> inTransaction(Handle handle, 
TransactionStatus status)
+          {
+            return handle
+                .createQuery(StringUtils.format("SELECT payload, schema_id, 
num_rows FROM %s WHERE used=true", getSegmentsTable()))
+                .setFetchSize(connector.getStreamingFetchSize())
+                .map(
+                    new ResultSetMapper<DataSegment>()
+                    {
+                      @Override
+                      public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLException
+                      {
+                        try {
+                          DataSegment segment = 
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+                          segmentStats.put(
+                              segment.getId(),
+                              new SegmentSchemaCache.SegmentStats(
+                                  (Long) r.getObject("schema_id"),
+                                  (Long) r.getObject("num_rows")
+                              )
+                          );
+                          return replaceWithExistingSegmentIfPresent(segment);
+                        }
+                        catch (IOException e) {
+                          log.makeAlert(e, "Failed to read segment from 
db.").emit();
+                          // If one entry in database is corrupted doPoll() 
should continue to work overall. See
+                          // filter by `Objects::nonNull` below in this method.
+                          return null;
+                        }
+                      }
+                    }
+                )
+                .list();
+          }
+        }
+    );
+
+    ConcurrentMap<Long, SchemaPayload> schemaMap = new ConcurrentHashMap<>();
+
+    String schemaPollQuery;

Review Comment:
   this schemaPollQuery should be final. 



##########
server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java:
##########
@@ -245,102 +242,103 @@ public AbstractSegmentMetadataCache(
     this.emitter = emitter;
   }
 
-  private void startCacheExec()
+  protected void cacheExecLoop()
   {
-    cacheExec.submit(
-        () -> {
-          final Stopwatch stopwatch = Stopwatch.createStarted();
-          long lastRefresh = 0L;
-          long lastFailure = 0L;
-
-          try {
-            while (!Thread.currentThread().isInterrupted()) {
-              final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
-              final Set<String> dataSourcesToRebuild = new TreeSet<>();
-
-              try {
-                synchronized (lock) {
-                  final long nextRefreshNoFuzz = DateTimes
-                      .utc(lastRefresh)
-                      .plus(config.getMetadataRefreshPeriod())
-                      .getMillis();
-
-                  // Fuzz a bit to spread load out when we have multiple 
brokers.
-                  final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
-
-                  while (true) {
-                    // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
-                    final boolean wasRecentFailure = DateTimes.utc(lastFailure)
-                                                              
.plus(config.getMetadataRefreshPeriod())
-                                                              .isAfterNow();
-
-                    if (isServerViewInitialized &&
-                        !wasRecentFailure &&
-                        (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
-                        (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
-                      // We need to do a refresh. Break out of the waiting 
loop.
-                      break;
-                    }
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    long lastRefresh = 0L;
+    long lastFailure = 0L;
 
-                    // lastFailure != 0L means exceptions happened before and 
there're some refresh work was not completed.
-                    // so that even if ServerView is initialized, we can't let 
broker complete initialization.
-                    if (isServerViewInitialized && lastFailure == 0L) {
-                      // Server view is initialized, but we don't need to do a 
refresh. Could happen if there are
-                      // no segments in the system yet. Just mark us as 
initialized, then.
-                      setInitializedAndReportInitTime(stopwatch);
-                    }
+    try {
+      refreshWaitCondition();
+      while (!Thread.currentThread().isInterrupted()) {
+        final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+        final Set<String> dataSourcesToRebuild = new TreeSet<>();
 
-                    // Wait some more, we'll wake up when it might be time to 
do another refresh.
-                    lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
-                  }
+        try {
+          synchronized (lock) {
+            final long nextRefreshNoFuzz = DateTimes
+                .utc(lastRefresh)
+                .plus(config.getMetadataRefreshPeriod())
+                .getMillis();
+
+            // Fuzz a bit to spread load out when we have multiple brokers.
+            final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
+
+            while (true) {
+              // Do not refresh if it's too soon after a failure (to avoid 
rapid cycles of failure).
+              final boolean wasRecentFailure = DateTimes.utc(lastFailure)
+                                                        
.plus(config.getMetadataRefreshPeriod())
+                                                        .isAfterNow();
+              if (isServerViewInitialized &&
+                  !wasRecentFailure &&
+                  (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
+                  (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
+                // We need to do a refresh. Break out of the waiting loop.
+                break;
+              }
 
-                  segmentsToRefresh.addAll(segmentsNeedingRefresh);
-                  segmentsNeedingRefresh.clear();
+              // lastFailure != 0L means exceptions happened before and 
there're some refresh work was not completed.
+              // so that even if ServerView is initialized, we can't let 
broker complete initialization.
+              if (isServerViewInitialized && lastFailure == 0L) {
+                // Server view is initialized, but we don't need to do a 
refresh. Could happen if there are
+                // no segments in the system yet. Just mark us as initialized, 
then.
+                setInitializedAndReportInitTime(stopwatch);
+              }
 
-                  // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
-                  segmentsNeedingRefresh.addAll(mutableSegments);
+              // Wait some more, we'll wake up when it might be time to do 
another refresh.
+              lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
+            }
 
-                  lastFailure = 0L;
-                  lastRefresh = System.currentTimeMillis();
-                  refreshImmediately = false;
-                }
+            segmentsToRefresh.addAll(segmentsNeedingRefresh);
+            segmentsNeedingRefresh.clear();
 
-                refresh(segmentsToRefresh, dataSourcesToRebuild);
+            // Mutable segments need a refresh every period, since new columns 
could be added dynamically.
+            segmentsNeedingRefresh.addAll(mutableSegments);
 
-                setInitializedAndReportInitTime(stopwatch);
-              }
-              catch (InterruptedException e) {
-                // Fall through.
-                throw e;
-              }
-              catch (Exception e) {
-                log.warn(e, "Metadata refresh failed, trying again soon.");
-
-                synchronized (lock) {
-                  // Add our segments and datasources back to their refresh 
and rebuild lists.
-                  segmentsNeedingRefresh.addAll(segmentsToRefresh);
-                  dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
-                  lastFailure = System.currentTimeMillis();
-                }
-              }
-            }
-          }
-          catch (InterruptedException e) {
-            // Just exit.
-          }
-          catch (Throwable e) {
-            // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
-            // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
-            log.makeAlert(e, "Metadata refresh failed permanently").emit();
-            throw e;
+            lastFailure = 0L;
+            lastRefresh = System.currentTimeMillis();
+            refreshImmediately = false;
           }
-          finally {
-            log.info("Metadata refresh stopped.");
+
+          log.debug("Executing refresh.");
+          refresh(segmentsToRefresh, dataSourcesToRebuild);
+
+          setInitializedAndReportInitTime(stopwatch);
+        }
+        catch (InterruptedException e) {
+          // Fall through.
+          throw e;
+        }
+        catch (Exception e) {
+          log.warn(e, "Metadata refresh failed, trying again soon.");
+
+          synchronized (lock) {
+            // Add our segments and datasources back to their refresh and 
rebuild lists.
+            segmentsNeedingRefresh.addAll(segmentsToRefresh);
+            dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
+            lastFailure = System.currentTimeMillis();
           }
         }
-    );
+      }
+    }
+    catch (InterruptedException e) {
+      // Just exit.
+    }
+    catch (Throwable e) {
+      // Throwables that fall out to here (not caught by an inner try/catch) 
are potentially gnarly, like
+      // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata.
+      log.makeAlert(e, "Metadata refresh failed permanently").emit();
+      throw e;
+    }
+    finally {
+      log.info("Metadata refresh stopped.");
+    }
   }
 
+  public abstract void start() throws InterruptedException;

Review Comment:
   Please add some java docs to these methods. 



##########
server/src/main/java/org/apache/druid/segment/metadata/CentralizedDatasourceSchemaConfig.java:
##########
@@ -27,38 +27,72 @@
  */
 public class CentralizedDatasourceSchemaConfig
 {
+  public static final String PROPERTY_PREFIX = 
"druid.centralizedDatasourceSchema";
+
+  public static final String SCHEMA_VERSION = "v1";
+
   @JsonProperty
   private boolean enabled = false;
+  @JsonProperty
+  private boolean backFillEnabled = true;
+  @JsonProperty
+  private long backFillPeriod = 60000;

Review Comment:
   This as well. 



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1088,6 +1123,174 @@ public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLE
     );
   }
 
+  private void doPollSegmentAndSchema()
+  {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    log.info("Starting polling of segment and schema table. latestSchemaId is 
[%s].", latestSchemaId.get());
+
+    ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats = 
new ConcurrentHashMap<>();
+
+    // some databases such as PostgreSQL require auto-commit turned off
+    // to stream results back, enabling transactions disables auto-commit
+    //
+    // setting connection to read-only will allow some database such as MySQL
+    // to automatically use read-only transaction mode, further optimizing the 
query
+    final List<DataSegment> segments = connector.inReadOnlyTransaction(
+        new TransactionCallback<List<DataSegment>>()
+        {
+          @Override
+          public List<DataSegment> inTransaction(Handle handle, 
TransactionStatus status)
+          {
+            return handle
+                .createQuery(StringUtils.format("SELECT payload, schema_id, 
num_rows FROM %s WHERE used=true", getSegmentsTable()))
+                .setFetchSize(connector.getStreamingFetchSize())
+                .map(
+                    new ResultSetMapper<DataSegment>()
+                    {
+                      @Override
+                      public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLException
+                      {
+                        try {
+                          DataSegment segment = 
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+                          segmentStats.put(
+                              segment.getId(),
+                              new SegmentSchemaCache.SegmentStats(
+                                  (Long) r.getObject("schema_id"),
+                                  (Long) r.getObject("num_rows")
+                              )
+                          );
+                          return replaceWithExistingSegmentIfPresent(segment);
+                        }
+                        catch (IOException e) {
+                          log.makeAlert(e, "Failed to read segment from 
db.").emit();
+                          // If one entry in database is corrupted doPoll() 
should continue to work overall. See
+                          // filter by `Objects::nonNull` below in this method.
+                          return null;
+                        }
+                      }
+                    }
+                )
+                .list();
+          }
+        }
+    );
+
+    ConcurrentMap<Long, SchemaPayload> schemaMap = new ConcurrentHashMap<>();
+
+    String schemaPollQuery;
+
+    Long lastSchemaIdPrePoll = latestSchemaId.get();
+    if (lastSchemaIdPrePoll == null) {
+      log.info("Executing full schema refresh.");
+      schemaPollQuery =
+          StringUtils.format(
+              "SELECT id, payload FROM %s WHERE version = '%s'",
+              getSegmentSchemaTable(),
+              CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+          );
+    } else {
+      schemaPollQuery = StringUtils.format(
+          "SELECT id, payload FROM %s WHERE version = '%s' AND id > %s",
+          getSegmentSchemaTable(),
+          CentralizedDatasourceSchemaConfig.SCHEMA_VERSION,
+          lastSchemaIdPrePoll
+      );
+    }
+    String finalSchemaPollQuery = schemaPollQuery;
+
+    final AtomicReference<Long> maxPolledId = new AtomicReference<>();
+    maxPolledId.set(lastSchemaIdPrePoll);
+
+    connector.inReadOnlyTransaction(new TransactionCallback<Object>()
+    {
+      @Override
+      public Object inTransaction(Handle handle, TransactionStatus status)
+      {
+        return handle.createQuery(finalSchemaPollQuery)
+                     .setFetchSize(connector.getStreamingFetchSize())
+                     .map(new ResultSetMapper<Void>()
+                     {
+                       @Override
+                       public Void map(int index, ResultSet r, 
StatementContext ctx) throws SQLException
+                       {
+                         try {
+                           long id = r.getLong("id");
+
+                           if (maxPolledId.get() == null || id > 
maxPolledId.get()) {

Review Comment:
   Just set the max new variable and set the atomic variable once. 



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1088,6 +1123,174 @@ public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLE
     );
   }
 
+  private void doPollSegmentAndSchema()
+  {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    log.info("Starting polling of segment and schema table. latestSchemaId is 
[%s].", latestSchemaId.get());
+
+    ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats = 
new ConcurrentHashMap<>();
+
+    // some databases such as PostgreSQL require auto-commit turned off
+    // to stream results back, enabling transactions disables auto-commit
+    //
+    // setting connection to read-only will allow some database such as MySQL
+    // to automatically use read-only transaction mode, further optimizing the 
query
+    final List<DataSegment> segments = connector.inReadOnlyTransaction(
+        new TransactionCallback<List<DataSegment>>()
+        {
+          @Override
+          public List<DataSegment> inTransaction(Handle handle, 
TransactionStatus status)
+          {
+            return handle
+                .createQuery(StringUtils.format("SELECT payload, schema_id, 
num_rows FROM %s WHERE used=true", getSegmentsTable()))
+                .setFetchSize(connector.getStreamingFetchSize())
+                .map(
+                    new ResultSetMapper<DataSegment>()
+                    {
+                      @Override
+                      public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLException
+                      {
+                        try {
+                          DataSegment segment = 
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+                          segmentStats.put(
+                              segment.getId(),
+                              new SegmentSchemaCache.SegmentStats(
+                                  (Long) r.getObject("schema_id"),
+                                  (Long) r.getObject("num_rows")
+                              )
+                          );
+                          return replaceWithExistingSegmentIfPresent(segment);
+                        }
+                        catch (IOException e) {
+                          log.makeAlert(e, "Failed to read segment from 
db.").emit();
+                          // If one entry in database is corrupted doPoll() 
should continue to work overall. See
+                          // filter by `Objects::nonNull` below in this method.
+                          return null;
+                        }
+                      }
+                    }
+                )
+                .list();
+          }
+        }
+    );
+
+    ConcurrentMap<Long, SchemaPayload> schemaMap = new ConcurrentHashMap<>();
+
+    String schemaPollQuery;
+
+    Long lastSchemaIdPrePoll = latestSchemaId.get();
+    if (lastSchemaIdPrePoll == null) {
+      log.info("Executing full schema refresh.");
+      schemaPollQuery =
+          StringUtils.format(
+              "SELECT id, payload FROM %s WHERE version = '%s'",
+              getSegmentSchemaTable(),
+              CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+          );
+    } else {
+      schemaPollQuery = StringUtils.format(
+          "SELECT id, payload FROM %s WHERE version = '%s' AND id > %s",
+          getSegmentSchemaTable(),
+          CentralizedDatasourceSchemaConfig.SCHEMA_VERSION,
+          lastSchemaIdPrePoll
+      );
+    }
+    String finalSchemaPollQuery = schemaPollQuery;
+
+    final AtomicReference<Long> maxPolledId = new AtomicReference<>();
+    maxPolledId.set(lastSchemaIdPrePoll);
+
+    connector.inReadOnlyTransaction(new TransactionCallback<Object>()
+    {
+      @Override
+      public Object inTransaction(Handle handle, TransactionStatus status)
+      {
+        return handle.createQuery(finalSchemaPollQuery)
+                     .setFetchSize(connector.getStreamingFetchSize())
+                     .map(new ResultSetMapper<Void>()
+                     {
+                       @Override
+                       public Void map(int index, ResultSet r, 
StatementContext ctx) throws SQLException
+                       {
+                         try {
+                           long id = r.getLong("id");
+
+                           if (maxPolledId.get() == null || id > 
maxPolledId.get()) {

Review Comment:
   Create a private subClass if lambdas are troubling. 



##########
server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java:
##########
@@ -245,102 +242,103 @@ public AbstractSegmentMetadataCache(
     this.emitter = emitter;
   }
 
-  private void startCacheExec()
+  protected void cacheExecLoop()
   {
-    cacheExec.submit(
-        () -> {
-          final Stopwatch stopwatch = Stopwatch.createStarted();
-          long lastRefresh = 0L;
-          long lastFailure = 0L;
-
-          try {
-            while (!Thread.currentThread().isInterrupted()) {
-              final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
-              final Set<String> dataSourcesToRebuild = new TreeSet<>();
-
-              try {
-                synchronized (lock) {
-                  final long nextRefreshNoFuzz = DateTimes
-                      .utc(lastRefresh)
-                      .plus(config.getMetadataRefreshPeriod())
-                      .getMillis();
-
-                  // Fuzz a bit to spread load out when we have multiple 
brokers.
-                  final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
-
-                  while (true) {
-                    // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
-                    final boolean wasRecentFailure = DateTimes.utc(lastFailure)
-                                                              
.plus(config.getMetadataRefreshPeriod())
-                                                              .isAfterNow();
-
-                    if (isServerViewInitialized &&
-                        !wasRecentFailure &&
-                        (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
-                        (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
-                      // We need to do a refresh. Break out of the waiting 
loop.
-                      break;
-                    }
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    long lastRefresh = 0L;
+    long lastFailure = 0L;
 
-                    // lastFailure != 0L means exceptions happened before and 
there're some refresh work was not completed.
-                    // so that even if ServerView is initialized, we can't let 
broker complete initialization.
-                    if (isServerViewInitialized && lastFailure == 0L) {
-                      // Server view is initialized, but we don't need to do a 
refresh. Could happen if there are
-                      // no segments in the system yet. Just mark us as 
initialized, then.
-                      setInitializedAndReportInitTime(stopwatch);
-                    }
+    try {
+      refreshWaitCondition();
+      while (!Thread.currentThread().isInterrupted()) {
+        final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+        final Set<String> dataSourcesToRebuild = new TreeSet<>();
 
-                    // Wait some more, we'll wake up when it might be time to 
do another refresh.
-                    lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
-                  }
+        try {
+          synchronized (lock) {
+            final long nextRefreshNoFuzz = DateTimes
+                .utc(lastRefresh)
+                .plus(config.getMetadataRefreshPeriod())
+                .getMillis();
+
+            // Fuzz a bit to spread load out when we have multiple brokers.
+            final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
+
+            while (true) {
+              // Do not refresh if it's too soon after a failure (to avoid 
rapid cycles of failure).
+              final boolean wasRecentFailure = DateTimes.utc(lastFailure)
+                                                        
.plus(config.getMetadataRefreshPeriod())
+                                                        .isAfterNow();
+              if (isServerViewInitialized &&
+                  !wasRecentFailure &&
+                  (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
+                  (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
+                // We need to do a refresh. Break out of the waiting loop.
+                break;
+              }
 
-                  segmentsToRefresh.addAll(segmentsNeedingRefresh);
-                  segmentsNeedingRefresh.clear();
+              // lastFailure != 0L means exceptions happened before and 
there're some refresh work was not completed.
+              // so that even if ServerView is initialized, we can't let 
broker complete initialization.
+              if (isServerViewInitialized && lastFailure == 0L) {
+                // Server view is initialized, but we don't need to do a 
refresh. Could happen if there are
+                // no segments in the system yet. Just mark us as initialized, 
then.
+                setInitializedAndReportInitTime(stopwatch);
+              }
 
-                  // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
-                  segmentsNeedingRefresh.addAll(mutableSegments);
+              // Wait some more, we'll wake up when it might be time to do 
another refresh.
+              lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
+            }
 
-                  lastFailure = 0L;
-                  lastRefresh = System.currentTimeMillis();
-                  refreshImmediately = false;
-                }
+            segmentsToRefresh.addAll(segmentsNeedingRefresh);
+            segmentsNeedingRefresh.clear();
 
-                refresh(segmentsToRefresh, dataSourcesToRebuild);
+            // Mutable segments need a refresh every period, since new columns 
could be added dynamically.
+            segmentsNeedingRefresh.addAll(mutableSegments);
 
-                setInitializedAndReportInitTime(stopwatch);
-              }
-              catch (InterruptedException e) {
-                // Fall through.
-                throw e;
-              }
-              catch (Exception e) {
-                log.warn(e, "Metadata refresh failed, trying again soon.");
-
-                synchronized (lock) {
-                  // Add our segments and datasources back to their refresh 
and rebuild lists.
-                  segmentsNeedingRefresh.addAll(segmentsToRefresh);
-                  dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
-                  lastFailure = System.currentTimeMillis();
-                }
-              }
-            }
-          }
-          catch (InterruptedException e) {
-            // Just exit.
-          }
-          catch (Throwable e) {
-            // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
-            // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
-            log.makeAlert(e, "Metadata refresh failed permanently").emit();
-            throw e;
+            lastFailure = 0L;
+            lastRefresh = System.currentTimeMillis();
+            refreshImmediately = false;
           }
-          finally {
-            log.info("Metadata refresh stopped.");
+
+          log.debug("Executing refresh.");

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org


Reply via email to