clintropolis commented on a change in pull request #10020:
URL: https://github.com/apache/druid/pull/10020#discussion_r439690292
##########
File path:
sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -196,119 +207,130 @@ public DruidSchema(
public void start() throws InterruptedException
{
cacheExec.submit(
- new Runnable()
- {
- @Override
- public void run()
- {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
- final Set<String> dataSourcesToRebuild = new TreeSet<>();
-
- try {
- synchronized (lock) {
- final long nextRefreshNoFuzz = DateTimes
- .utc(lastRefresh)
- .plus(config.getMetadataRefreshPeriod())
- .getMillis();
-
- // Fuzz a bit to spread load out when we have multiple
brokers.
- final long nextRefresh = nextRefreshNoFuzz + (long)
((nextRefreshNoFuzz - lastRefresh) * 0.10);
-
- while (true) {
- // Do not refresh if it's too soon after a failure (to
avoid rapid cycles of failure).
- final boolean wasRecentFailure =
DateTimes.utc(lastFailure)
-
.plus(config.getMetadataRefreshPeriod())
- .isAfterNow();
-
- if (isServerViewInitialized &&
- !wasRecentFailure &&
- (!segmentsNeedingRefresh.isEmpty() ||
!dataSourcesNeedingRebuild.isEmpty()) &&
- (refreshImmediately || nextRefresh <
System.currentTimeMillis())) {
- // We need to do a refresh. Break out of the waiting
loop.
- break;
- }
-
- if (isServerViewInitialized) {
- // Server view is initialized, but we don't need to do
a refresh. Could happen if there are
- // no segments in the system yet. Just mark us as
initialized, then.
- initialized.countDown();
- }
-
- // Wait some more, we'll wake up when it might be time
to do another refresh.
- lock.wait(Math.max(1, nextRefresh -
System.currentTimeMillis()));
+ () -> {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+ final Set<String> dataSourcesToRebuild = new TreeSet<>();
+
+ try {
+ synchronized (lock) {
+ final long nextRefreshNoFuzz = DateTimes
+ .utc(lastRefresh)
+ .plus(config.getMetadataRefreshPeriod())
+ .getMillis();
+
+ // Fuzz a bit to spread load out when we have multiple
brokers.
+ final long nextRefresh = nextRefreshNoFuzz + (long)
((nextRefreshNoFuzz - lastRefresh) * 0.10);
+
+ while (true) {
+ // Do not refresh if it's too soon after a failure (to
avoid rapid cycles of failure).
+ final boolean wasRecentFailure = DateTimes.utc(lastFailure)
+
.plus(config.getMetadataRefreshPeriod())
+ .isAfterNow();
+
+ if (isServerViewInitialized &&
+ !wasRecentFailure &&
+ (!segmentsNeedingRefresh.isEmpty() ||
!dataSourcesNeedingRebuild.isEmpty()) &&
+ (refreshImmediately || nextRefresh <
System.currentTimeMillis())) {
+ // We need to do a refresh. Break out of the waiting
loop.
+ break;
}
- segmentsToRefresh.addAll(segmentsNeedingRefresh);
- segmentsNeedingRefresh.clear();
-
- // Mutable segments need a refresh every period, since new
columns could be added dynamically.
- segmentsNeedingRefresh.addAll(mutableSegments);
+ if (isServerViewInitialized) {
+ // Server view is initialized, but we don't need to do a
refresh. Could happen if there are
+ // no segments in the system yet. Just mark us as
initialized, then.
+ initialized.countDown();
+ }
- lastFailure = 0L;
- lastRefresh = System.currentTimeMillis();
- refreshImmediately = false;
+ // Wait some more, we'll wake up when it might be time to
do another refresh.
+ lock.wait(Math.max(1, nextRefresh -
System.currentTimeMillis()));
}
- // Refresh the segments.
- final Set<SegmentId> refreshed =
refreshSegments(segmentsToRefresh);
+ segmentsToRefresh.addAll(segmentsNeedingRefresh);
+ segmentsNeedingRefresh.clear();
- synchronized (lock) {
- // Add missing segments back to the refresh list.
-
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
+ // Mutable segments need a refresh every period, since new
columns could be added dynamically.
+ segmentsNeedingRefresh.addAll(mutableSegments);
- // Compute the list of dataSources to rebuild tables for.
- dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
- refreshed.forEach(segment ->
dataSourcesToRebuild.add(segment.getDataSource()));
- dataSourcesNeedingRebuild.clear();
+ lastFailure = 0L;
+ lastRefresh = System.currentTimeMillis();
+ refreshImmediately = false;
+ }
- lock.notifyAll();
- }
+ // Refresh the segments.
+ final Set<SegmentId> refreshed =
refreshSegments(segmentsToRefresh);
- // Rebuild the dataSources.
- for (String dataSource : dataSourcesToRebuild) {
- final DruidTable druidTable = buildDruidTable(dataSource);
- final DruidTable oldTable = tables.put(dataSource,
druidTable);
- if (oldTable == null ||
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
- log.info("dataSource [%s] has new signature: %s.",
dataSource, druidTable.getRowSignature());
- } else {
- log.debug("dataSource [%s] signature is unchanged.",
dataSource);
- }
- }
+ synchronized (lock) {
+ // Add missing segments back to the refresh list.
+
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
- initialized.countDown();
- }
- catch (InterruptedException e) {
- // Fall through.
- throw e;
+ // Compute the list of dataSources to rebuild tables for.
+ dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
+ refreshed.forEach(segment ->
dataSourcesToRebuild.add(segment.getDataSource()));
+ dataSourcesNeedingRebuild.clear();
+
+ lock.notifyAll();
}
- catch (Exception e) {
- log.warn(e, "Metadata refresh failed, trying again soon.");
-
- synchronized (lock) {
- // Add our segments and dataSources back to their refresh
and rebuild lists.
- segmentsNeedingRefresh.addAll(segmentsToRefresh);
- dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
- lastFailure = System.currentTimeMillis();
- lock.notifyAll();
+
+ // Rebuild the dataSources.
+ for (String dataSource : dataSourcesToRebuild) {
+ final DruidTable druidTable = buildDruidTable(dataSource);
+ final DruidTable oldTable = tables.put(dataSource,
druidTable);
+ if (oldTable == null ||
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
+ log.info("dataSource [%s] has new signature: %s.",
dataSource, druidTable.getRowSignature());
+ } else {
+ log.debug("dataSource [%s] signature is unchanged.",
dataSource);
}
}
+
+ initialized.countDown();
+ }
+ catch (InterruptedException e) {
+ // Fall through.
+ throw e;
+ }
+ catch (Exception e) {
+ log.warn(e, "Metadata refresh failed, trying again soon.");
+
+ synchronized (lock) {
+ // Add our segments and dataSources back to their refresh
and rebuild lists.
+ segmentsNeedingRefresh.addAll(segmentsToRefresh);
+ dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
+ lastFailure = System.currentTimeMillis();
+ lock.notifyAll();
+ }
}
- }
- catch (InterruptedException e) {
- // Just exit.
- }
- catch (Throwable e) {
- // Throwables that fall out to here (not caught by an inner
try/catch) are potentially gnarly, like
- // OOMEs. Anyway, let's just emit an alert and stop refreshing
metadata.
- log.makeAlert(e, "Metadata refresh failed permanently").emit();
- throw e;
- }
- finally {
- log.info("Metadata refresh stopped.");
}
}
+ catch (InterruptedException e) {
+ // Just exit.
+ }
+ catch (Throwable e) {
+ // Throwables that fall out to here (not caught by an inner
try/catch) are potentially gnarly, like
+ // OOMEs. Anyway, let's just emit an alert and stop refreshing
metadata.
+ log.makeAlert(e, "Metadata refresh failed permanently").emit();
+ throw e;
+ }
+ finally {
+ log.info("Metadata refresh stopped.");
+ }
+ }
+ );
+
+ ScheduledExecutors.scheduleWithFixedDelay(
+ localSegmentExec,
+ config.getMetadataRefreshPeriod().toStandardDuration(),
+ config.getMetadataRefreshPeriod().toStandardDuration(),
+ () -> {
+ synchronized (lock) {
+ // refresh known broadcast segments
+ Set<String> localSegmentDatasources =
segmentManager.getDataSourceNames();
+ dataSourcesNeedingRebuild.addAll(localSegmentDatasources);
+ broadcastDatasources.clear();
+ broadcastDatasources.addAll(localSegmentDatasources);
Review comment:
added a comment to clarify that since this code only runs on the broker,
and the broker can only have broadcast segments, that it's a safe assumption.
If we ever load brokers load normal segments we might need to reconsider how
this works, and it might be more appropriate longer term to use the load rules
instead of inferring from context, but it should be ok for now I think
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]