This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new bbbb6e1c3f fix DruidSchema issue where datasources with no segments
can become stuck in tables list indefinitely (#12727)
bbbb6e1c3f is described below
commit bbbb6e1c3f9ea52766fe0c330fe8e6d0249560a4
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Jul 1 18:54:01 2022 -0700
fix DruidSchema issue where datasources with no segments can become stuck
in tables list indefinitely (#12727)
---
.../apache/druid/tests/query/ITBroadcastJoinQueryTest.java | 4 +++-
.../org/apache/druid/sql/calcite/schema/DruidSchema.java | 12 +++++++++++-
.../org/apache/druid/sql/calcite/schema/DruidSchemaTest.java | 12 ++++++++++++
3 files changed, 26 insertions(+), 2 deletions(-)
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
index 9642199707..22323edef7 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
@@ -84,7 +84,8 @@ public class ITBroadcastJoinQueryTest extends
AbstractIndexerTest
ImmutableList.of()
);
}
- catch (Exception ignored) {
+ catch (Exception e) {
+ LOG.error(e, "Failed to post load rules");
}
});
@@ -127,6 +128,7 @@ public class ITBroadcastJoinQueryTest extends
AbstractIndexerTest
replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_QUERIES_RESOURCE),
BROADCAST_JOIN_DATASOURCE)
);
}
+
finally {
closer.close();
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 702ad82e38..785913b0a6 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -68,6 +68,7 @@ import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Comparator;
import java.util.EnumSet;
@@ -402,6 +403,11 @@ public class DruidSchema extends AbstractSchema
// Rebuild the dataSources.
for (String dataSource : dataSourcesToRebuild) {
final DruidTable druidTable = buildDruidTable(dataSource);
+ if (druidTable == null) {
+ log.info("dataSource[%s] no longer exists, all metadata removed.",
dataSource);
+ tables.remove(dataSource);
+ continue;
+ }
final DruidTable oldTable = tables.put(dataSource, druidTable);
final String description = druidTable.getDataSource().isGlobal() ?
"global dataSource" : "dataSource";
if (oldTable == null ||
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
@@ -773,12 +779,13 @@ public class DruidSchema extends AbstractSchema
}
@VisibleForTesting
+ @Nullable
DruidTable buildDruidTable(final String dataSource)
{
ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> segmentsMap =
segmentMetadataInfo.get(dataSource);
final Map<String, ColumnType> columnTypes = new TreeMap<>();
- if (segmentsMap != null) {
+ if (segmentsMap != null && !segmentsMap.isEmpty()) {
for (AvailableSegmentMetadata availableSegmentMetadata :
segmentsMap.values()) {
final RowSignature rowSignature =
availableSegmentMetadata.getRowSignature();
if (rowSignature != null) {
@@ -792,6 +799,9 @@ public class DruidSchema extends AbstractSchema
}
}
}
+ } else {
+ // table has no segments
+ return null;
}
final RowSignature.Builder builder = RowSignature.builder();
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
index 2a36d08c47..708ba50c4d 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -77,6 +77,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -1199,6 +1200,17 @@ public class DruidSchemaTest extends
DruidSchemaTestCommon
);
}
+ @Test
+ public void testStaleDatasourceRefresh() throws IOException
+ {
+ Set<SegmentId> segments = new HashSet<>();
+ Set<String> datasources = new HashSet<>();
+ datasources.add("wat");
+ Assert.assertNull(schema.getTable("wat"));
+ schema.refresh(segments, datasources);
+ Assert.assertNull(schema.getTable("wat"));
+ }
+
private static DataSegment newSegment(String datasource, int partitionId)
{
return new DataSegment(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]