gianm commented on a change in pull request #10020:
URL: https://github.com/apache/druid/pull/10020#discussion_r440600939



##########
File path: 
processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * {@link TableDataSource} variant for globally available 'broadcast' 
segments. If bound to a
+ * {@link org.apache.druid.segment.join.JoinableFactory} that can create an
+ * {@link org.apache.druid.segment.join.table.IndexedTable} using 
DruidBinders.joinableFactoryBinder, this allows
+ * optimal usage of segments using this DataSource type in join operations 
(because they are global), and so can be pushed
+ * down to historicals as a {@link JoinDataSource}, instead of requiring a 
subquery join using
+ * {@link InlineDataSource} to construct an {@link 
org.apache.druid.segment.join.table.IndexedTable} on the fly on the
+ * broker. Because it is also a {@link TableDataSource}, when queried 
directly, or on the left hand side of a join,
+ * they will be treated as any normal segment.
+ */
+@JsonTypeName("global")
+public class GlobalTableDataSource extends TableDataSource
+{
+  @JsonCreator
+  public GlobalTableDataSource(@JsonProperty("name") String name)
+  {
+    super(name);
+  }
+
+  @Override
+  public boolean isCacheable()
+  {
+    return false;

Review comment:
       Why shouldn't it be cacheable?

##########
File path: processing/src/main/java/org/apache/druid/query/TableDataSource.java
##########
@@ -102,6 +102,11 @@ public final boolean equals(Object o)
       return false;
     }
 
+    if ((o instanceof GlobalTableDataSource || this instanceof 
GlobalTableDataSource) &&
+        !getClass().equals(o.getClass())) {

Review comment:
       I think you can make this a little less gross by replacing this `equals` 
impl (and the one in GlobalTableDataSource) with a new auto-generated one that 
checks getClass.

##########
File path: 
processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * {@link TableDataSource} variant for globally available 'broadcast' 
segments. If bound to a
+ * {@link org.apache.druid.segment.join.JoinableFactory} that can create an
+ * {@link org.apache.druid.segment.join.table.IndexedTable} using 
DruidBinders.joinableFactoryBinder, this allows
+ * optimal usage of segments using this DataSource type in join operations 
(because they are global), and so can be pushed
+ * down to historicals as a {@link JoinDataSource}, instead of requiring a 
subquery join using
+ * {@link InlineDataSource} to construct an {@link 
org.apache.druid.segment.join.table.IndexedTable} on the fly on the
+ * broker. Because it is also a {@link TableDataSource}, when queried 
directly, or on the left hand side of a join,
+ * they will be treated as any normal segment.

Review comment:
       "datasource" makes more sense here than "segment".

##########
File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -196,119 +207,132 @@ public DruidSchema(
   public void start() throws InterruptedException
   {
     cacheExec.submit(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            try {
-              while (!Thread.currentThread().isInterrupted()) {
-                final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
-                final Set<String> dataSourcesToRebuild = new TreeSet<>();
-
-                try {
-                  synchronized (lock) {
-                    final long nextRefreshNoFuzz = DateTimes
-                        .utc(lastRefresh)
-                        .plus(config.getMetadataRefreshPeriod())
-                        .getMillis();
-
-                    // Fuzz a bit to spread load out when we have multiple 
brokers.
-                    final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
-
-                    while (true) {
-                      // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
-                      final boolean wasRecentFailure = 
DateTimes.utc(lastFailure)
-                                                                
.plus(config.getMetadataRefreshPeriod())
-                                                                .isAfterNow();
-
-                      if (isServerViewInitialized &&
-                          !wasRecentFailure &&
-                          (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
-                          (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
-                        // We need to do a refresh. Break out of the waiting 
loop.
-                        break;
-                      }
-
-                      if (isServerViewInitialized) {
-                        // Server view is initialized, but we don't need to do 
a refresh. Could happen if there are
-                        // no segments in the system yet. Just mark us as 
initialized, then.
-                        initialized.countDown();
-                      }
-
-                      // Wait some more, we'll wake up when it might be time 
to do another refresh.
-                      lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
+        () -> {
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+              final Set<String> dataSourcesToRebuild = new TreeSet<>();
+
+              try {
+                synchronized (lock) {
+                  final long nextRefreshNoFuzz = DateTimes
+                      .utc(lastRefresh)
+                      .plus(config.getMetadataRefreshPeriod())
+                      .getMillis();
+
+                  // Fuzz a bit to spread load out when we have multiple 
brokers.
+                  final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
+
+                  while (true) {
+                    // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
+                    final boolean wasRecentFailure = DateTimes.utc(lastFailure)
+                                                              
.plus(config.getMetadataRefreshPeriod())
+                                                              .isAfterNow();
+
+                    if (isServerViewInitialized &&
+                        !wasRecentFailure &&
+                        (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
+                        (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
+                      // We need to do a refresh. Break out of the waiting 
loop.
+                      break;
                     }
 
-                    segmentsToRefresh.addAll(segmentsNeedingRefresh);
-                    segmentsNeedingRefresh.clear();
-
-                    // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
-                    segmentsNeedingRefresh.addAll(mutableSegments);
+                    if (isServerViewInitialized) {
+                      // Server view is initialized, but we don't need to do a 
refresh. Could happen if there are
+                      // no segments in the system yet. Just mark us as 
initialized, then.
+                      initialized.countDown();
+                    }
 
-                    lastFailure = 0L;
-                    lastRefresh = System.currentTimeMillis();
-                    refreshImmediately = false;
+                    // Wait some more, we'll wake up when it might be time to 
do another refresh.
+                    lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
                   }
 
-                  // Refresh the segments.
-                  final Set<SegmentId> refreshed = 
refreshSegments(segmentsToRefresh);
+                  segmentsToRefresh.addAll(segmentsNeedingRefresh);
+                  segmentsNeedingRefresh.clear();
 
-                  synchronized (lock) {
-                    // Add missing segments back to the refresh list.
-                    
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
+                  // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
+                  segmentsNeedingRefresh.addAll(mutableSegments);
 
-                    // Compute the list of dataSources to rebuild tables for.
-                    dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
-                    refreshed.forEach(segment -> 
dataSourcesToRebuild.add(segment.getDataSource()));
-                    dataSourcesNeedingRebuild.clear();
+                  lastFailure = 0L;
+                  lastRefresh = System.currentTimeMillis();
+                  refreshImmediately = false;
+                }
 
-                    lock.notifyAll();
-                  }
+                // Refresh the segments.
+                final Set<SegmentId> refreshed = 
refreshSegments(segmentsToRefresh);
 
-                  // Rebuild the dataSources.
-                  for (String dataSource : dataSourcesToRebuild) {
-                    final DruidTable druidTable = buildDruidTable(dataSource);
-                    final DruidTable oldTable = tables.put(dataSource, 
druidTable);
-                    if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
-                      log.info("dataSource [%s] has new signature: %s.", 
dataSource, druidTable.getRowSignature());
-                    } else {
-                      log.debug("dataSource [%s] signature is unchanged.", 
dataSource);
-                    }
-                  }
+                synchronized (lock) {
+                  // Add missing segments back to the refresh list.
+                  
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
 
-                  initialized.countDown();
-                }
-                catch (InterruptedException e) {
-                  // Fall through.
-                  throw e;
+                  // Compute the list of dataSources to rebuild tables for.
+                  dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
+                  refreshed.forEach(segment -> 
dataSourcesToRebuild.add(segment.getDataSource()));
+                  dataSourcesNeedingRebuild.clear();
+
+                  lock.notifyAll();
                 }
-                catch (Exception e) {
-                  log.warn(e, "Metadata refresh failed, trying again soon.");
-
-                  synchronized (lock) {
-                    // Add our segments and dataSources back to their refresh 
and rebuild lists.
-                    segmentsNeedingRefresh.addAll(segmentsToRefresh);
-                    dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
-                    lastFailure = System.currentTimeMillis();
-                    lock.notifyAll();
+
+                // Rebuild the dataSources.
+                for (String dataSource : dataSourcesToRebuild) {
+                  final DruidTable druidTable = buildDruidTable(dataSource);
+                  final DruidTable oldTable = tables.put(dataSource, 
druidTable);
+                  if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
+                    log.info("dataSource [%s] has new signature: %s.", 
dataSource, druidTable.getRowSignature());
+                  } else {
+                    log.debug("dataSource [%s] signature is unchanged.", 
dataSource);
                   }
                 }
+
+                initialized.countDown();
+              }
+              catch (InterruptedException e) {
+                // Fall through.
+                throw e;
+              }
+              catch (Exception e) {
+                log.warn(e, "Metadata refresh failed, trying again soon.");
+
+                synchronized (lock) {
+                  // Add our segments and dataSources back to their refresh 
and rebuild lists.
+                  segmentsNeedingRefresh.addAll(segmentsToRefresh);
+                  dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
+                  lastFailure = System.currentTimeMillis();
+                  lock.notifyAll();
+                }
               }
-            }
-            catch (InterruptedException e) {
-              // Just exit.
-            }
-            catch (Throwable e) {
-              // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
-              // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
-              log.makeAlert(e, "Metadata refresh failed permanently").emit();
-              throw e;
-            }
-            finally {
-              log.info("Metadata refresh stopped.");
             }
           }
+          catch (InterruptedException e) {
+            // Just exit.
+          }
+          catch (Throwable e) {
+            // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
+            // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
+            log.makeAlert(e, "Metadata refresh failed permanently").emit();
+            throw e;
+          }
+          finally {
+            log.info("Metadata refresh stopped.");
+          }
+        }
+    );
+
+    ScheduledExecutors.scheduleWithFixedDelay(
+        localSegmentExec,
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        () -> {
+          synchronized (lock) {
+            // refresh known broadcast segments. Since DruidSchema is only 
present on the broker, any segment we have
+            // locally in the SegmentManager must be broadcast datasources. 
This could potentially be replaced in the
+            // future by fetching load rules from the coordinator
+            Set<String> localSegmentDatasources = 
segmentManager.getDataSourceNames();

Review comment:
       `localSegmentDataSources` would be more consistent spelling, I think.

##########
File path: 
processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class GlobalTableDataSourceTest
+{
+  private static final GlobalTableDataSource GLOBAL_TABLE_DATA_SOURCE = new 
GlobalTableDataSource("foo");
+
+  @Test
+  public void testEquals()

Review comment:
       Please add a test for nonequality with a TableDataSource of the same 
name.

##########
File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -196,119 +207,132 @@ public DruidSchema(
   public void start() throws InterruptedException
   {
     cacheExec.submit(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            try {
-              while (!Thread.currentThread().isInterrupted()) {
-                final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
-                final Set<String> dataSourcesToRebuild = new TreeSet<>();
-
-                try {
-                  synchronized (lock) {
-                    final long nextRefreshNoFuzz = DateTimes
-                        .utc(lastRefresh)
-                        .plus(config.getMetadataRefreshPeriod())
-                        .getMillis();
-
-                    // Fuzz a bit to spread load out when we have multiple 
brokers.
-                    final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
-
-                    while (true) {
-                      // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
-                      final boolean wasRecentFailure = 
DateTimes.utc(lastFailure)
-                                                                
.plus(config.getMetadataRefreshPeriod())
-                                                                .isAfterNow();
-
-                      if (isServerViewInitialized &&
-                          !wasRecentFailure &&
-                          (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
-                          (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
-                        // We need to do a refresh. Break out of the waiting 
loop.
-                        break;
-                      }
-
-                      if (isServerViewInitialized) {
-                        // Server view is initialized, but we don't need to do 
a refresh. Could happen if there are
-                        // no segments in the system yet. Just mark us as 
initialized, then.
-                        initialized.countDown();
-                      }
-
-                      // Wait some more, we'll wake up when it might be time 
to do another refresh.
-                      lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
+        () -> {
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+              final Set<String> dataSourcesToRebuild = new TreeSet<>();
+
+              try {
+                synchronized (lock) {
+                  final long nextRefreshNoFuzz = DateTimes
+                      .utc(lastRefresh)
+                      .plus(config.getMetadataRefreshPeriod())
+                      .getMillis();
+
+                  // Fuzz a bit to spread load out when we have multiple 
brokers.
+                  final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
+
+                  while (true) {
+                    // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
+                    final boolean wasRecentFailure = DateTimes.utc(lastFailure)
+                                                              
.plus(config.getMetadataRefreshPeriod())
+                                                              .isAfterNow();
+
+                    if (isServerViewInitialized &&
+                        !wasRecentFailure &&
+                        (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
+                        (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
+                      // We need to do a refresh. Break out of the waiting 
loop.
+                      break;
                     }
 
-                    segmentsToRefresh.addAll(segmentsNeedingRefresh);
-                    segmentsNeedingRefresh.clear();
-
-                    // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
-                    segmentsNeedingRefresh.addAll(mutableSegments);
+                    if (isServerViewInitialized) {
+                      // Server view is initialized, but we don't need to do a 
refresh. Could happen if there are
+                      // no segments in the system yet. Just mark us as 
initialized, then.
+                      initialized.countDown();
+                    }
 
-                    lastFailure = 0L;
-                    lastRefresh = System.currentTimeMillis();
-                    refreshImmediately = false;
+                    // Wait some more, we'll wake up when it might be time to 
do another refresh.
+                    lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
                   }
 
-                  // Refresh the segments.
-                  final Set<SegmentId> refreshed = 
refreshSegments(segmentsToRefresh);
+                  segmentsToRefresh.addAll(segmentsNeedingRefresh);
+                  segmentsNeedingRefresh.clear();
 
-                  synchronized (lock) {
-                    // Add missing segments back to the refresh list.
-                    
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
+                  // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
+                  segmentsNeedingRefresh.addAll(mutableSegments);
 
-                    // Compute the list of dataSources to rebuild tables for.
-                    dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
-                    refreshed.forEach(segment -> 
dataSourcesToRebuild.add(segment.getDataSource()));
-                    dataSourcesNeedingRebuild.clear();
+                  lastFailure = 0L;
+                  lastRefresh = System.currentTimeMillis();
+                  refreshImmediately = false;
+                }
 
-                    lock.notifyAll();
-                  }
+                // Refresh the segments.
+                final Set<SegmentId> refreshed = 
refreshSegments(segmentsToRefresh);
 
-                  // Rebuild the dataSources.
-                  for (String dataSource : dataSourcesToRebuild) {
-                    final DruidTable druidTable = buildDruidTable(dataSource);
-                    final DruidTable oldTable = tables.put(dataSource, 
druidTable);
-                    if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
-                      log.info("dataSource [%s] has new signature: %s.", 
dataSource, druidTable.getRowSignature());
-                    } else {
-                      log.debug("dataSource [%s] signature is unchanged.", 
dataSource);
-                    }
-                  }
+                synchronized (lock) {
+                  // Add missing segments back to the refresh list.
+                  
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
 
-                  initialized.countDown();
-                }
-                catch (InterruptedException e) {
-                  // Fall through.
-                  throw e;
+                  // Compute the list of dataSources to rebuild tables for.
+                  dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
+                  refreshed.forEach(segment -> 
dataSourcesToRebuild.add(segment.getDataSource()));
+                  dataSourcesNeedingRebuild.clear();
+
+                  lock.notifyAll();
                 }
-                catch (Exception e) {
-                  log.warn(e, "Metadata refresh failed, trying again soon.");
-
-                  synchronized (lock) {
-                    // Add our segments and dataSources back to their refresh 
and rebuild lists.
-                    segmentsNeedingRefresh.addAll(segmentsToRefresh);
-                    dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
-                    lastFailure = System.currentTimeMillis();
-                    lock.notifyAll();
+
+                // Rebuild the dataSources.
+                for (String dataSource : dataSourcesToRebuild) {
+                  final DruidTable druidTable = buildDruidTable(dataSource);
+                  final DruidTable oldTable = tables.put(dataSource, 
druidTable);
+                  if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
+                    log.info("dataSource [%s] has new signature: %s.", 
dataSource, druidTable.getRowSignature());
+                  } else {
+                    log.debug("dataSource [%s] signature is unchanged.", 
dataSource);
                   }
                 }
+
+                initialized.countDown();
+              }
+              catch (InterruptedException e) {
+                // Fall through.
+                throw e;
+              }
+              catch (Exception e) {
+                log.warn(e, "Metadata refresh failed, trying again soon.");
+
+                synchronized (lock) {
+                  // Add our segments and dataSources back to their refresh 
and rebuild lists.
+                  segmentsNeedingRefresh.addAll(segmentsToRefresh);
+                  dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
+                  lastFailure = System.currentTimeMillis();
+                  lock.notifyAll();
+                }
               }
-            }
-            catch (InterruptedException e) {
-              // Just exit.
-            }
-            catch (Throwable e) {
-              // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
-              // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
-              log.makeAlert(e, "Metadata refresh failed permanently").emit();
-              throw e;
-            }
-            finally {
-              log.info("Metadata refresh stopped.");
             }
           }
+          catch (InterruptedException e) {
+            // Just exit.
+          }
+          catch (Throwable e) {
+            // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
+            // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
+            log.makeAlert(e, "Metadata refresh failed permanently").emit();
+            throw e;
+          }
+          finally {
+            log.info("Metadata refresh stopped.");
+          }
+        }
+    );
+
+    ScheduledExecutors.scheduleWithFixedDelay(
+        localSegmentExec,
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        () -> {
+          synchronized (lock) {
+            // refresh known broadcast segments. Since DruidSchema is only 
present on the broker, any segment we have
+            // locally in the SegmentManager must be broadcast datasources. 
This could potentially be replaced in the
+            // future by fetching load rules from the coordinator
+            Set<String> localSegmentDatasources = 
segmentManager.getDataSourceNames();
+            dataSourcesNeedingRebuild.addAll(localSegmentDatasources);

Review comment:
       Why do we need to rebuild them all continuously?

##########
File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -122,6 +128,8 @@
   // All dataSources that need tables regenerated.
   private final Set<String> dataSourcesNeedingRebuild = new HashSet<>();
 
+  private final Set<String> broadcastDatasources = new HashSet<>();

Review comment:
       `broadcastDataSources` is more consistent spelling. Please add a comment 
too.

##########
File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -196,119 +207,132 @@ public DruidSchema(
   public void start() throws InterruptedException
   {
     cacheExec.submit(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            try {
-              while (!Thread.currentThread().isInterrupted()) {
-                final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
-                final Set<String> dataSourcesToRebuild = new TreeSet<>();
-
-                try {
-                  synchronized (lock) {
-                    final long nextRefreshNoFuzz = DateTimes
-                        .utc(lastRefresh)
-                        .plus(config.getMetadataRefreshPeriod())
-                        .getMillis();
-
-                    // Fuzz a bit to spread load out when we have multiple 
brokers.
-                    final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
-
-                    while (true) {
-                      // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
-                      final boolean wasRecentFailure = 
DateTimes.utc(lastFailure)
-                                                                
.plus(config.getMetadataRefreshPeriod())
-                                                                .isAfterNow();
-
-                      if (isServerViewInitialized &&
-                          !wasRecentFailure &&
-                          (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
-                          (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
-                        // We need to do a refresh. Break out of the waiting 
loop.
-                        break;
-                      }
-
-                      if (isServerViewInitialized) {
-                        // Server view is initialized, but we don't need to do 
a refresh. Could happen if there are
-                        // no segments in the system yet. Just mark us as 
initialized, then.
-                        initialized.countDown();
-                      }
-
-                      // Wait some more, we'll wake up when it might be time 
to do another refresh.
-                      lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
+        () -> {
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+              final Set<String> dataSourcesToRebuild = new TreeSet<>();
+
+              try {
+                synchronized (lock) {
+                  final long nextRefreshNoFuzz = DateTimes
+                      .utc(lastRefresh)
+                      .plus(config.getMetadataRefreshPeriod())
+                      .getMillis();
+
+                  // Fuzz a bit to spread load out when we have multiple 
brokers.
+                  final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
+
+                  while (true) {
+                    // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
+                    final boolean wasRecentFailure = DateTimes.utc(lastFailure)
+                                                              
.plus(config.getMetadataRefreshPeriod())
+                                                              .isAfterNow();
+
+                    if (isServerViewInitialized &&
+                        !wasRecentFailure &&
+                        (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
+                        (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
+                      // We need to do a refresh. Break out of the waiting 
loop.
+                      break;
                     }
 
-                    segmentsToRefresh.addAll(segmentsNeedingRefresh);
-                    segmentsNeedingRefresh.clear();
-
-                    // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
-                    segmentsNeedingRefresh.addAll(mutableSegments);
+                    if (isServerViewInitialized) {
+                      // Server view is initialized, but we don't need to do a 
refresh. Could happen if there are
+                      // no segments in the system yet. Just mark us as 
initialized, then.
+                      initialized.countDown();
+                    }
 
-                    lastFailure = 0L;
-                    lastRefresh = System.currentTimeMillis();
-                    refreshImmediately = false;
+                    // Wait some more, we'll wake up when it might be time to 
do another refresh.
+                    lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
                   }
 
-                  // Refresh the segments.
-                  final Set<SegmentId> refreshed = 
refreshSegments(segmentsToRefresh);
+                  segmentsToRefresh.addAll(segmentsNeedingRefresh);
+                  segmentsNeedingRefresh.clear();
 
-                  synchronized (lock) {
-                    // Add missing segments back to the refresh list.
-                    
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
+                  // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
+                  segmentsNeedingRefresh.addAll(mutableSegments);
 
-                    // Compute the list of dataSources to rebuild tables for.
-                    dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
-                    refreshed.forEach(segment -> 
dataSourcesToRebuild.add(segment.getDataSource()));
-                    dataSourcesNeedingRebuild.clear();
+                  lastFailure = 0L;
+                  lastRefresh = System.currentTimeMillis();
+                  refreshImmediately = false;
+                }
 
-                    lock.notifyAll();
-                  }
+                // Refresh the segments.
+                final Set<SegmentId> refreshed = 
refreshSegments(segmentsToRefresh);
 
-                  // Rebuild the dataSources.
-                  for (String dataSource : dataSourcesToRebuild) {
-                    final DruidTable druidTable = buildDruidTable(dataSource);
-                    final DruidTable oldTable = tables.put(dataSource, 
druidTable);
-                    if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
-                      log.info("dataSource [%s] has new signature: %s.", 
dataSource, druidTable.getRowSignature());
-                    } else {
-                      log.debug("dataSource [%s] signature is unchanged.", 
dataSource);
-                    }
-                  }
+                synchronized (lock) {
+                  // Add missing segments back to the refresh list.
+                  
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
 
-                  initialized.countDown();
-                }
-                catch (InterruptedException e) {
-                  // Fall through.
-                  throw e;
+                  // Compute the list of dataSources to rebuild tables for.
+                  dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
+                  refreshed.forEach(segment -> 
dataSourcesToRebuild.add(segment.getDataSource()));
+                  dataSourcesNeedingRebuild.clear();
+
+                  lock.notifyAll();
                 }
-                catch (Exception e) {
-                  log.warn(e, "Metadata refresh failed, trying again soon.");
-
-                  synchronized (lock) {
-                    // Add our segments and dataSources back to their refresh 
and rebuild lists.
-                    segmentsNeedingRefresh.addAll(segmentsToRefresh);
-                    dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
-                    lastFailure = System.currentTimeMillis();
-                    lock.notifyAll();
+
+                // Rebuild the dataSources.
+                for (String dataSource : dataSourcesToRebuild) {
+                  final DruidTable druidTable = buildDruidTable(dataSource);
+                  final DruidTable oldTable = tables.put(dataSource, 
druidTable);
+                  if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
+                    log.info("dataSource [%s] has new signature: %s.", 
dataSource, druidTable.getRowSignature());
+                  } else {
+                    log.debug("dataSource [%s] signature is unchanged.", 
dataSource);
                   }
                 }
+
+                initialized.countDown();
+              }
+              catch (InterruptedException e) {
+                // Fall through.
+                throw e;
+              }
+              catch (Exception e) {
+                log.warn(e, "Metadata refresh failed, trying again soon.");
+
+                synchronized (lock) {
+                  // Add our segments and dataSources back to their refresh 
and rebuild lists.
+                  segmentsNeedingRefresh.addAll(segmentsToRefresh);
+                  dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
+                  lastFailure = System.currentTimeMillis();
+                  lock.notifyAll();
+                }
               }
-            }
-            catch (InterruptedException e) {
-              // Just exit.
-            }
-            catch (Throwable e) {
-              // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
-              // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
-              log.makeAlert(e, "Metadata refresh failed permanently").emit();
-              throw e;
-            }
-            finally {
-              log.info("Metadata refresh stopped.");
             }
           }
+          catch (InterruptedException e) {
+            // Just exit.
+          }
+          catch (Throwable e) {
+            // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
+            // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
+            log.makeAlert(e, "Metadata refresh failed permanently").emit();
+            throw e;
+          }
+          finally {
+            log.info("Metadata refresh stopped.");
+          }
+        }
+    );
+
+    ScheduledExecutors.scheduleWithFixedDelay(

Review comment:
       Why not do this as part of the loop in the main thread?

##########
File path: processing/src/main/java/org/apache/druid/query/DataSource.java
##########
@@ -35,7 +35,8 @@
     @JsonSubTypes.Type(value = UnionDataSource.class, name = "union"),
     @JsonSubTypes.Type(value = JoinDataSource.class, name = "join"),
     @JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"),
-    @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline")
+    @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"),
+    @JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "global")

Review comment:
       1. I think `globalTable` would be a better name. We only have one chance 
to get it right!
   2. IMO for some consistency between SQL and native, we'll need to either 
transparently globalify the regular `table` type (perhaps a rewrite step like 
ClientQuerySegmentWalker's inlining?) or we'll need to document the 
`globalTable` type. I think the former is nicer, because the latter comes with 
too many caveats (you have to make sure to use it in the proper conditions).

##########
File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -616,6 +631,12 @@ private DruidTable buildDruidTable(final String dataSource)
 
       final RowSignature.Builder builder = RowSignature.builder();
       columnTypes.forEach(builder::add);
+      if (broadcastDatasources.contains(dataSource)) {

Review comment:
       tiny nit: the logic is laid out a bit weirdly here; it'd make more sense 
to emphasize what's different by having the dataSource be created in the if 
block, but the DruidTable created outside of it.

##########
File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -122,6 +128,8 @@
   // All dataSources that need tables regenerated.
   private final Set<String> dataSourcesNeedingRebuild = new HashSet<>();
 
+  private final Set<String> broadcastDatasources = new HashSet<>();

Review comment:
       This should be `@GuardedBy("lock")`, and so should 
`dataSourcesNeedingRebuild`, `mutableSegments`, `segmentsNeedingRefresh`, 
`refreshImmediately`, `lastRefresh`, `lastFailure`, and 
`isServerViewInitialized`.
   
   Could you please add those, and also remove the comment on `lock`, which is 
woefully out of date. (Thanks in advance for the housekeeping work.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to