This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new bbbb6e1c3f fix DruidSchema issue where datasources with no segments 
can become stuck in tables list indefinitely (#12727)
bbbb6e1c3f is described below

commit bbbb6e1c3f9ea52766fe0c330fe8e6d0249560a4
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Jul 1 18:54:01 2022 -0700

    fix DruidSchema issue where datasources with no segments can become stuck 
in tables list indefinitely (#12727)
---
 .../apache/druid/tests/query/ITBroadcastJoinQueryTest.java   |  4 +++-
 .../org/apache/druid/sql/calcite/schema/DruidSchema.java     | 12 +++++++++++-
 .../org/apache/druid/sql/calcite/schema/DruidSchemaTest.java | 12 ++++++++++++
 3 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
index 9642199707..22323edef7 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
@@ -84,7 +84,8 @@ public class ITBroadcastJoinQueryTest extends 
AbstractIndexerTest
               ImmutableList.of()
           );
         }
-        catch (Exception ignored) {
+        catch (Exception e) {
+          LOG.error(e, "Failed to post load rules");
         }
       });
 
@@ -127,6 +128,7 @@ public class ITBroadcastJoinQueryTest extends 
AbstractIndexerTest
           
replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_QUERIES_RESOURCE), 
BROADCAST_JOIN_DATASOURCE)
       );
     }
+
     finally {
       closer.close();
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 702ad82e38..785913b0a6 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -68,6 +68,7 @@ import org.apache.druid.sql.calcite.table.DruidTable;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.EnumSet;
@@ -402,6 +403,11 @@ public class DruidSchema extends AbstractSchema
     // Rebuild the dataSources.
     for (String dataSource : dataSourcesToRebuild) {
       final DruidTable druidTable = buildDruidTable(dataSource);
+      if (druidTable == null) {
+        log.info("dataSource[%s] no longer exists, all metadata removed.", 
dataSource);
+        tables.remove(dataSource);
+        continue;
+      }
       final DruidTable oldTable = tables.put(dataSource, druidTable);
       final String description = druidTable.getDataSource().isGlobal() ? 
"global dataSource" : "dataSource";
       if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
@@ -773,12 +779,13 @@ public class DruidSchema extends AbstractSchema
   }
 
   @VisibleForTesting
+  @Nullable
   DruidTable buildDruidTable(final String dataSource)
   {
     ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> segmentsMap = 
segmentMetadataInfo.get(dataSource);
     final Map<String, ColumnType> columnTypes = new TreeMap<>();
 
-    if (segmentsMap != null) {
+    if (segmentsMap != null && !segmentsMap.isEmpty()) {
       for (AvailableSegmentMetadata availableSegmentMetadata : 
segmentsMap.values()) {
         final RowSignature rowSignature = 
availableSegmentMetadata.getRowSignature();
         if (rowSignature != null) {
@@ -792,6 +799,9 @@ public class DruidSchema extends AbstractSchema
           }
         }
       }
+    } else {
+      // table has no segments
+      return null;
     }
 
     final RowSignature.Builder builder = RowSignature.builder();
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
index 2a36d08c47..708ba50c4d 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -77,6 +77,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -1199,6 +1200,17 @@ public class DruidSchemaTest extends 
DruidSchemaTestCommon
     );
   }
 
+  @Test
+  public void testStaleDatasourceRefresh() throws IOException
+  {
+    Set<SegmentId> segments = new HashSet<>();
+    Set<String> datasources = new HashSet<>();
+    datasources.add("wat");
+    Assert.assertNull(schema.getTable("wat"));
+    schema.refresh(segments, datasources);
+    Assert.assertNull(schema.getTable("wat"));
+  }
+
   private static DataSegment newSegment(String datasource, int partitionId)
   {
     return new DataSegment(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to