clintropolis commented on a change in pull request #10020:
URL: https://github.com/apache/druid/pull/10020#discussion_r440818261



##########
File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -196,119 +207,132 @@ public DruidSchema(
   public void start() throws InterruptedException
   {
     cacheExec.submit(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            try {
-              while (!Thread.currentThread().isInterrupted()) {
-                final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
-                final Set<String> dataSourcesToRebuild = new TreeSet<>();
-
-                try {
-                  synchronized (lock) {
-                    final long nextRefreshNoFuzz = DateTimes
-                        .utc(lastRefresh)
-                        .plus(config.getMetadataRefreshPeriod())
-                        .getMillis();
-
-                    // Fuzz a bit to spread load out when we have multiple 
brokers.
-                    final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
-
-                    while (true) {
-                      // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
-                      final boolean wasRecentFailure = 
DateTimes.utc(lastFailure)
-                                                                
.plus(config.getMetadataRefreshPeriod())
-                                                                .isAfterNow();
-
-                      if (isServerViewInitialized &&
-                          !wasRecentFailure &&
-                          (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
-                          (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
-                        // We need to do a refresh. Break out of the waiting 
loop.
-                        break;
-                      }
-
-                      if (isServerViewInitialized) {
-                        // Server view is initialized, but we don't need to do 
a refresh. Could happen if there are
-                        // no segments in the system yet. Just mark us as 
initialized, then.
-                        initialized.countDown();
-                      }
-
-                      // Wait some more, we'll wake up when it might be time 
to do another refresh.
-                      lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
+        () -> {
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+              final Set<String> dataSourcesToRebuild = new TreeSet<>();
+
+              try {
+                synchronized (lock) {
+                  final long nextRefreshNoFuzz = DateTimes
+                      .utc(lastRefresh)
+                      .plus(config.getMetadataRefreshPeriod())
+                      .getMillis();
+
+                  // Fuzz a bit to spread load out when we have multiple 
brokers.
+                  final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
+
+                  while (true) {
+                    // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
+                    final boolean wasRecentFailure = DateTimes.utc(lastFailure)
+                                                              
.plus(config.getMetadataRefreshPeriod())
+                                                              .isAfterNow();
+
+                    if (isServerViewInitialized &&
+                        !wasRecentFailure &&
+                        (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
+                        (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
+                      // We need to do a refresh. Break out of the waiting 
loop.
+                      break;
                     }
 
-                    segmentsToRefresh.addAll(segmentsNeedingRefresh);
-                    segmentsNeedingRefresh.clear();
-
-                    // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
-                    segmentsNeedingRefresh.addAll(mutableSegments);
+                    if (isServerViewInitialized) {
+                      // Server view is initialized, but we don't need to do a 
refresh. Could happen if there are
+                      // no segments in the system yet. Just mark us as 
initialized, then.
+                      initialized.countDown();
+                    }
 
-                    lastFailure = 0L;
-                    lastRefresh = System.currentTimeMillis();
-                    refreshImmediately = false;
+                    // Wait some more, we'll wake up when it might be time to 
do another refresh.
+                    lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
                   }
 
-                  // Refresh the segments.
-                  final Set<SegmentId> refreshed = 
refreshSegments(segmentsToRefresh);
+                  segmentsToRefresh.addAll(segmentsNeedingRefresh);
+                  segmentsNeedingRefresh.clear();
 
-                  synchronized (lock) {
-                    // Add missing segments back to the refresh list.
-                    
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
+                  // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
+                  segmentsNeedingRefresh.addAll(mutableSegments);
 
-                    // Compute the list of dataSources to rebuild tables for.
-                    dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
-                    refreshed.forEach(segment -> 
dataSourcesToRebuild.add(segment.getDataSource()));
-                    dataSourcesNeedingRebuild.clear();
+                  lastFailure = 0L;
+                  lastRefresh = System.currentTimeMillis();
+                  refreshImmediately = false;
+                }
 
-                    lock.notifyAll();
-                  }
+                // Refresh the segments.
+                final Set<SegmentId> refreshed = 
refreshSegments(segmentsToRefresh);
 
-                  // Rebuild the dataSources.
-                  for (String dataSource : dataSourcesToRebuild) {
-                    final DruidTable druidTable = buildDruidTable(dataSource);
-                    final DruidTable oldTable = tables.put(dataSource, 
druidTable);
-                    if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
-                      log.info("dataSource [%s] has new signature: %s.", 
dataSource, druidTable.getRowSignature());
-                    } else {
-                      log.debug("dataSource [%s] signature is unchanged.", 
dataSource);
-                    }
-                  }
+                synchronized (lock) {
+                  // Add missing segments back to the refresh list.
+                  
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
 
-                  initialized.countDown();
-                }
-                catch (InterruptedException e) {
-                  // Fall through.
-                  throw e;
+                  // Compute the list of dataSources to rebuild tables for.
+                  dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
+                  refreshed.forEach(segment -> 
dataSourcesToRebuild.add(segment.getDataSource()));
+                  dataSourcesNeedingRebuild.clear();
+
+                  lock.notifyAll();
                 }
-                catch (Exception e) {
-                  log.warn(e, "Metadata refresh failed, trying again soon.");
-
-                  synchronized (lock) {
-                    // Add our segments and dataSources back to their refresh 
and rebuild lists.
-                    segmentsNeedingRefresh.addAll(segmentsToRefresh);
-                    dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
-                    lastFailure = System.currentTimeMillis();
-                    lock.notifyAll();
+
+                // Rebuild the dataSources.
+                for (String dataSource : dataSourcesToRebuild) {
+                  final DruidTable druidTable = buildDruidTable(dataSource);
+                  final DruidTable oldTable = tables.put(dataSource, 
druidTable);
+                  if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
+                    log.info("dataSource [%s] has new signature: %s.", 
dataSource, druidTable.getRowSignature());
+                  } else {
+                    log.debug("dataSource [%s] signature is unchanged.", 
dataSource);
                   }
                 }
+
+                initialized.countDown();
+              }
+              catch (InterruptedException e) {
+                // Fall through.
+                throw e;
+              }
+              catch (Exception e) {
+                log.warn(e, "Metadata refresh failed, trying again soon.");
+
+                synchronized (lock) {
+                  // Add our segments and dataSources back to their refresh 
and rebuild lists.
+                  segmentsNeedingRefresh.addAll(segmentsToRefresh);
+                  dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
+                  lastFailure = System.currentTimeMillis();
+                  lock.notifyAll();
+                }
               }
-            }
-            catch (InterruptedException e) {
-              // Just exit.
-            }
-            catch (Throwable e) {
-              // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
-              // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
-              log.makeAlert(e, "Metadata refresh failed permanently").emit();
-              throw e;
-            }
-            finally {
-              log.info("Metadata refresh stopped.");
             }
           }
+          catch (InterruptedException e) {
+            // Just exit.
+          }
+          catch (Throwable e) {
+            // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
+            // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
+            log.makeAlert(e, "Metadata refresh failed permanently").emit();
+            throw e;
+          }
+          finally {
+            log.info("Metadata refresh stopped.");
+          }
+        }
+    );
+
+    ScheduledExecutors.scheduleWithFixedDelay(
+        localSegmentExec,
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        () -> {
+          synchronized (lock) {
+            // refresh known broadcast segments. Since DruidSchema is only 
present on the broker, any segment we have
+            // locally in the SegmentManager must be broadcast datasources. 
This could potentially be replaced in the
+            // future by fetching load rules from the coordinator
+            Set<String> localSegmentDatasources = 
segmentManager.getDataSourceNames();
+            dataSourcesNeedingRebuild.addAll(localSegmentDatasources);

Review comment:
       We don't really, I've refactored this pretty heavily.
   
   I reworked the fix I did for #10017, to instead now preserve the 
segment/timeline event callbacks from the `BrokerServerView` (but still not add 
the server/segment to the timeline to avoid the weird loops), which in turn 
allows `DruidSchema` get these events for broker segments to mark datasources 
to be rebuilt in the normal path (just skipping metadata fetch on the 
assumption that the segment will appear, so that the existing loop handles 
these changes correctly. 

##########
File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -616,6 +631,12 @@ private DruidTable buildDruidTable(final String dataSource)
 
       final RowSignature.Builder builder = RowSignature.builder();
       columnTypes.forEach(builder::add);
+      if (broadcastDatasources.contains(dataSource)) {

Review comment:
       modified

##########
File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -196,119 +207,132 @@ public DruidSchema(
   public void start() throws InterruptedException
   {
     cacheExec.submit(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            try {
-              while (!Thread.currentThread().isInterrupted()) {
-                final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
-                final Set<String> dataSourcesToRebuild = new TreeSet<>();
-
-                try {
-                  synchronized (lock) {
-                    final long nextRefreshNoFuzz = DateTimes
-                        .utc(lastRefresh)
-                        .plus(config.getMetadataRefreshPeriod())
-                        .getMillis();
-
-                    // Fuzz a bit to spread load out when we have multiple 
brokers.
-                    final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
-
-                    while (true) {
-                      // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
-                      final boolean wasRecentFailure = 
DateTimes.utc(lastFailure)
-                                                                
.plus(config.getMetadataRefreshPeriod())
-                                                                .isAfterNow();
-
-                      if (isServerViewInitialized &&
-                          !wasRecentFailure &&
-                          (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
-                          (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
-                        // We need to do a refresh. Break out of the waiting 
loop.
-                        break;
-                      }
-
-                      if (isServerViewInitialized) {
-                        // Server view is initialized, but we don't need to do 
a refresh. Could happen if there are
-                        // no segments in the system yet. Just mark us as 
initialized, then.
-                        initialized.countDown();
-                      }
-
-                      // Wait some more, we'll wake up when it might be time 
to do another refresh.
-                      lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
+        () -> {
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+              final Set<String> dataSourcesToRebuild = new TreeSet<>();
+
+              try {
+                synchronized (lock) {
+                  final long nextRefreshNoFuzz = DateTimes
+                      .utc(lastRefresh)
+                      .plus(config.getMetadataRefreshPeriod())
+                      .getMillis();
+
+                  // Fuzz a bit to spread load out when we have multiple 
brokers.
+                  final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
+
+                  while (true) {
+                    // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
+                    final boolean wasRecentFailure = DateTimes.utc(lastFailure)
+                                                              
.plus(config.getMetadataRefreshPeriod())
+                                                              .isAfterNow();
+
+                    if (isServerViewInitialized &&
+                        !wasRecentFailure &&
+                        (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
+                        (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
+                      // We need to do a refresh. Break out of the waiting 
loop.
+                      break;
                     }
 
-                    segmentsToRefresh.addAll(segmentsNeedingRefresh);
-                    segmentsNeedingRefresh.clear();
-
-                    // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
-                    segmentsNeedingRefresh.addAll(mutableSegments);
+                    if (isServerViewInitialized) {
+                      // Server view is initialized, but we don't need to do a 
refresh. Could happen if there are
+                      // no segments in the system yet. Just mark us as 
initialized, then.
+                      initialized.countDown();
+                    }
 
-                    lastFailure = 0L;
-                    lastRefresh = System.currentTimeMillis();
-                    refreshImmediately = false;
+                    // Wait some more, we'll wake up when it might be time to 
do another refresh.
+                    lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
                   }
 
-                  // Refresh the segments.
-                  final Set<SegmentId> refreshed = 
refreshSegments(segmentsToRefresh);
+                  segmentsToRefresh.addAll(segmentsNeedingRefresh);
+                  segmentsNeedingRefresh.clear();
 
-                  synchronized (lock) {
-                    // Add missing segments back to the refresh list.
-                    
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
+                  // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
+                  segmentsNeedingRefresh.addAll(mutableSegments);
 
-                    // Compute the list of dataSources to rebuild tables for.
-                    dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
-                    refreshed.forEach(segment -> 
dataSourcesToRebuild.add(segment.getDataSource()));
-                    dataSourcesNeedingRebuild.clear();
+                  lastFailure = 0L;
+                  lastRefresh = System.currentTimeMillis();
+                  refreshImmediately = false;
+                }
 
-                    lock.notifyAll();
-                  }
+                // Refresh the segments.
+                final Set<SegmentId> refreshed = 
refreshSegments(segmentsToRefresh);
 
-                  // Rebuild the dataSources.
-                  for (String dataSource : dataSourcesToRebuild) {
-                    final DruidTable druidTable = buildDruidTable(dataSource);
-                    final DruidTable oldTable = tables.put(dataSource, 
druidTable);
-                    if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
-                      log.info("dataSource [%s] has new signature: %s.", 
dataSource, druidTable.getRowSignature());
-                    } else {
-                      log.debug("dataSource [%s] signature is unchanged.", 
dataSource);
-                    }
-                  }
+                synchronized (lock) {
+                  // Add missing segments back to the refresh list.
+                  
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
 
-                  initialized.countDown();
-                }
-                catch (InterruptedException e) {
-                  // Fall through.
-                  throw e;
+                  // Compute the list of dataSources to rebuild tables for.
+                  dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
+                  refreshed.forEach(segment -> 
dataSourcesToRebuild.add(segment.getDataSource()));
+                  dataSourcesNeedingRebuild.clear();
+
+                  lock.notifyAll();
                 }
-                catch (Exception e) {
-                  log.warn(e, "Metadata refresh failed, trying again soon.");
-
-                  synchronized (lock) {
-                    // Add our segments and dataSources back to their refresh 
and rebuild lists.
-                    segmentsNeedingRefresh.addAll(segmentsToRefresh);
-                    dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
-                    lastFailure = System.currentTimeMillis();
-                    lock.notifyAll();
+
+                // Rebuild the dataSources.
+                for (String dataSource : dataSourcesToRebuild) {
+                  final DruidTable druidTable = buildDruidTable(dataSource);
+                  final DruidTable oldTable = tables.put(dataSource, 
druidTable);
+                  if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
+                    log.info("dataSource [%s] has new signature: %s.", 
dataSource, druidTable.getRowSignature());
+                  } else {
+                    log.debug("dataSource [%s] signature is unchanged.", 
dataSource);
                   }
                 }
+
+                initialized.countDown();
+              }
+              catch (InterruptedException e) {
+                // Fall through.
+                throw e;
+              }
+              catch (Exception e) {
+                log.warn(e, "Metadata refresh failed, trying again soon.");
+
+                synchronized (lock) {
+                  // Add our segments and dataSources back to their refresh 
and rebuild lists.
+                  segmentsNeedingRefresh.addAll(segmentsToRefresh);
+                  dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
+                  lastFailure = System.currentTimeMillis();
+                  lock.notifyAll();
+                }
               }
-            }
-            catch (InterruptedException e) {
-              // Just exit.
-            }
-            catch (Throwable e) {
-              // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
-              // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
-              log.makeAlert(e, "Metadata refresh failed permanently").emit();
-              throw e;
-            }
-            finally {
-              log.info("Metadata refresh stopped.");
             }
           }
+          catch (InterruptedException e) {
+            // Just exit.
+          }
+          catch (Throwable e) {
+            // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
+            // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
+            log.makeAlert(e, "Metadata refresh failed permanently").emit();
+            throw e;
+          }
+          finally {
+            log.info("Metadata refresh stopped.");
+          }
+        }
+    );
+
+    ScheduledExecutors.scheduleWithFixedDelay(

Review comment:
       Removed this executor entirely in a refactor

##########
File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -122,6 +128,8 @@
   // All dataSources that need tables regenerated.
   private final Set<String> dataSourcesNeedingRebuild = new HashSet<>();
 
+  private final Set<String> broadcastDatasources = new HashSet<>();

Review comment:
       added annotations, though I actually ended up removing this field in a 
refactor

##########
File path: 
processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class GlobalTableDataSourceTest
+{
+  private static final GlobalTableDataSource GLOBAL_TABLE_DATA_SOURCE = new 
GlobalTableDataSource("foo");
+
+  @Test
+  public void testEquals()

Review comment:
       added

##########
File path: processing/src/main/java/org/apache/druid/query/TableDataSource.java
##########
@@ -102,6 +102,11 @@ public final boolean equals(Object o)
       return false;
     }
 
+    if ((o instanceof GlobalTableDataSource || this instanceof 
GlobalTableDataSource) &&
+        !getClass().equals(o.getClass())) {

Review comment:
       I haven't made this change yet, still considering the best way and need 
to think about it. `LegacyDataSource` is also a `TableDataSource` and they need 
to be equal to each other, so I think something somewhere is going to be 
gross...

##########
File path: 
processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * {@link TableDataSource} variant for globally available 'broadcast' 
segments. If bound to a
+ * {@link org.apache.druid.segment.join.JoinableFactory} that can create an
+ * {@link org.apache.druid.segment.join.table.IndexedTable} using 
DruidBinders.joinableFactoryBinder, this allows
+ * optimal usage of segments using this DataSource type in join operations 
(because they are global), and so can be pushed
+ * down to historicals as a {@link JoinDataSource}, instead of requiring a 
subquery join using
+ * {@link InlineDataSource} to construct an {@link 
org.apache.druid.segment.join.table.IndexedTable} on the fly on the
+ * broker. Because it is also a {@link TableDataSource}, when queried 
directly, or on the left hand side of a join,
+ * they will be treated as any normal segment.
+ */
+@JsonTypeName("global")
+public class GlobalTableDataSource extends TableDataSource
+{
+  @JsonCreator
+  public GlobalTableDataSource(@JsonProperty("name") String name)
+  {
+    super(name);
+  }
+
+  @Override
+  public boolean isCacheable()
+  {
+    return false;

Review comment:
       I think it should be, changed.

##########
File path: 
processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * {@link TableDataSource} variant for globally available 'broadcast' 
segments. If bound to a
+ * {@link org.apache.druid.segment.join.JoinableFactory} that can create an
+ * {@link org.apache.druid.segment.join.table.IndexedTable} using 
DruidBinders.joinableFactoryBinder, this allows
+ * optimal usage of segments using this DataSource type in join operations 
(because they are global), and so can be pushed
+ * down to historicals as a {@link JoinDataSource}, instead of requiring a 
subquery join using
+ * {@link InlineDataSource} to construct an {@link 
org.apache.druid.segment.join.table.IndexedTable} on the fly on the
+ * broker. Because it is also a {@link TableDataSource}, when queried 
directly, or on the left hand side of a join,
+ * they will be treated as any normal segment.

Review comment:
       fixed

##########
File path: 
sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -196,119 +207,132 @@ public DruidSchema(
   public void start() throws InterruptedException
   {
     cacheExec.submit(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            try {
-              while (!Thread.currentThread().isInterrupted()) {
-                final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
-                final Set<String> dataSourcesToRebuild = new TreeSet<>();
-
-                try {
-                  synchronized (lock) {
-                    final long nextRefreshNoFuzz = DateTimes
-                        .utc(lastRefresh)
-                        .plus(config.getMetadataRefreshPeriod())
-                        .getMillis();
-
-                    // Fuzz a bit to spread load out when we have multiple 
brokers.
-                    final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
-
-                    while (true) {
-                      // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
-                      final boolean wasRecentFailure = 
DateTimes.utc(lastFailure)
-                                                                
.plus(config.getMetadataRefreshPeriod())
-                                                                .isAfterNow();
-
-                      if (isServerViewInitialized &&
-                          !wasRecentFailure &&
-                          (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
-                          (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
-                        // We need to do a refresh. Break out of the waiting 
loop.
-                        break;
-                      }
-
-                      if (isServerViewInitialized) {
-                        // Server view is initialized, but we don't need to do 
a refresh. Could happen if there are
-                        // no segments in the system yet. Just mark us as 
initialized, then.
-                        initialized.countDown();
-                      }
-
-                      // Wait some more, we'll wake up when it might be time 
to do another refresh.
-                      lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
+        () -> {
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+              final Set<String> dataSourcesToRebuild = new TreeSet<>();
+
+              try {
+                synchronized (lock) {
+                  final long nextRefreshNoFuzz = DateTimes
+                      .utc(lastRefresh)
+                      .plus(config.getMetadataRefreshPeriod())
+                      .getMillis();
+
+                  // Fuzz a bit to spread load out when we have multiple 
brokers.
+                  final long nextRefresh = nextRefreshNoFuzz + (long) 
((nextRefreshNoFuzz - lastRefresh) * 0.10);
+
+                  while (true) {
+                    // Do not refresh if it's too soon after a failure (to 
avoid rapid cycles of failure).
+                    final boolean wasRecentFailure = DateTimes.utc(lastFailure)
+                                                              
.plus(config.getMetadataRefreshPeriod())
+                                                              .isAfterNow();
+
+                    if (isServerViewInitialized &&
+                        !wasRecentFailure &&
+                        (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
+                        (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
+                      // We need to do a refresh. Break out of the waiting 
loop.
+                      break;
                     }
 
-                    segmentsToRefresh.addAll(segmentsNeedingRefresh);
-                    segmentsNeedingRefresh.clear();
-
-                    // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
-                    segmentsNeedingRefresh.addAll(mutableSegments);
+                    if (isServerViewInitialized) {
+                      // Server view is initialized, but we don't need to do a 
refresh. Could happen if there are
+                      // no segments in the system yet. Just mark us as 
initialized, then.
+                      initialized.countDown();
+                    }
 
-                    lastFailure = 0L;
-                    lastRefresh = System.currentTimeMillis();
-                    refreshImmediately = false;
+                    // Wait some more, we'll wake up when it might be time to 
do another refresh.
+                    lock.wait(Math.max(1, nextRefresh - 
System.currentTimeMillis()));
                   }
 
-                  // Refresh the segments.
-                  final Set<SegmentId> refreshed = 
refreshSegments(segmentsToRefresh);
+                  segmentsToRefresh.addAll(segmentsNeedingRefresh);
+                  segmentsNeedingRefresh.clear();
 
-                  synchronized (lock) {
-                    // Add missing segments back to the refresh list.
-                    
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
+                  // Mutable segments need a refresh every period, since new 
columns could be added dynamically.
+                  segmentsNeedingRefresh.addAll(mutableSegments);
 
-                    // Compute the list of dataSources to rebuild tables for.
-                    dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
-                    refreshed.forEach(segment -> 
dataSourcesToRebuild.add(segment.getDataSource()));
-                    dataSourcesNeedingRebuild.clear();
+                  lastFailure = 0L;
+                  lastRefresh = System.currentTimeMillis();
+                  refreshImmediately = false;
+                }
 
-                    lock.notifyAll();
-                  }
+                // Refresh the segments.
+                final Set<SegmentId> refreshed = 
refreshSegments(segmentsToRefresh);
 
-                  // Rebuild the dataSources.
-                  for (String dataSource : dataSourcesToRebuild) {
-                    final DruidTable druidTable = buildDruidTable(dataSource);
-                    final DruidTable oldTable = tables.put(dataSource, 
druidTable);
-                    if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
-                      log.info("dataSource [%s] has new signature: %s.", 
dataSource, druidTable.getRowSignature());
-                    } else {
-                      log.debug("dataSource [%s] signature is unchanged.", 
dataSource);
-                    }
-                  }
+                synchronized (lock) {
+                  // Add missing segments back to the refresh list.
+                  
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
 
-                  initialized.countDown();
-                }
-                catch (InterruptedException e) {
-                  // Fall through.
-                  throw e;
+                  // Compute the list of dataSources to rebuild tables for.
+                  dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
+                  refreshed.forEach(segment -> 
dataSourcesToRebuild.add(segment.getDataSource()));
+                  dataSourcesNeedingRebuild.clear();
+
+                  lock.notifyAll();
                 }
-                catch (Exception e) {
-                  log.warn(e, "Metadata refresh failed, trying again soon.");
-
-                  synchronized (lock) {
-                    // Add our segments and dataSources back to their refresh 
and rebuild lists.
-                    segmentsNeedingRefresh.addAll(segmentsToRefresh);
-                    dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
-                    lastFailure = System.currentTimeMillis();
-                    lock.notifyAll();
+
+                // Rebuild the dataSources.
+                for (String dataSource : dataSourcesToRebuild) {
+                  final DruidTable druidTable = buildDruidTable(dataSource);
+                  final DruidTable oldTable = tables.put(dataSource, 
druidTable);
+                  if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
+                    log.info("dataSource [%s] has new signature: %s.", 
dataSource, druidTable.getRowSignature());
+                  } else {
+                    log.debug("dataSource [%s] signature is unchanged.", 
dataSource);
                   }
                 }
+
+                initialized.countDown();
+              }
+              catch (InterruptedException e) {
+                // Fall through.
+                throw e;
+              }
+              catch (Exception e) {
+                log.warn(e, "Metadata refresh failed, trying again soon.");
+
+                synchronized (lock) {
+                  // Add our segments and dataSources back to their refresh 
and rebuild lists.
+                  segmentsNeedingRefresh.addAll(segmentsToRefresh);
+                  dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
+                  lastFailure = System.currentTimeMillis();
+                  lock.notifyAll();
+                }
               }
-            }
-            catch (InterruptedException e) {
-              // Just exit.
-            }
-            catch (Throwable e) {
-              // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
-              // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
-              log.makeAlert(e, "Metadata refresh failed permanently").emit();
-              throw e;
-            }
-            finally {
-              log.info("Metadata refresh stopped.");
             }
           }
+          catch (InterruptedException e) {
+            // Just exit.
+          }
+          catch (Throwable e) {
+            // Throwables that fall out to here (not caught by an inner 
try/catch) are potentially gnarly, like
+            // OOMEs. Anyway, let's just emit an alert and stop refreshing 
metadata.
+            log.makeAlert(e, "Metadata refresh failed permanently").emit();
+            throw e;
+          }
+          finally {
+            log.info("Metadata refresh stopped.");
+          }
+        }
+    );
+
+    ScheduledExecutors.scheduleWithFixedDelay(
+        localSegmentExec,
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        () -> {
+          synchronized (lock) {
+            // refresh known broadcast segments. Since DruidSchema is only 
present on the broker, any segment we have
+            // locally in the SegmentManager must be broadcast datasources. 
This could potentially be replaced in the
+            // future by fetching load rules from the coordinator
+            Set<String> localSegmentDatasources = 
segmentManager.getDataSourceNames();

Review comment:
       this logic has been removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to