This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5446494e63c Non-existent datasource shouldn't affect schema rebuilding
for other datasources (#15355)
5446494e63c is described below
commit 5446494e63c92986cbca9a7e34aad8e94b85daf0
Author: Rishabh Singh <[email protected]>
AuthorDate: Tue Nov 14 12:52:33 2023 +0530
Non-existent datasource shouldn't affect schema rebuilding for other
datasources (#15355)
In pull request #14985, a bug was introduced where periodic refresh would
skip rebuilding a datasource's schema after encountering a non-existent
datasource. This resulted in remaining datasources having stale schema
information.
This change addresses the bug and adds a unit test to validate the refresh
mechanism's behaviour when a datasource is removed, and other datasources have
schema changes.
---
.../metadata/CoordinatorSegmentMetadataCache.java | 5 +-
.../CoordinatorSegmentMetadataCacheTest.java | 141 +++++++++++++++++++++
.../metadata/SegmentMetadataCacheCommon.java | 2 +-
.../calcite/schema/BrokerSegmentMetadataCache.java | 2 +-
.../schema/BrokerSegmentMetadataCacheTest.java | 141 +++++++++++++++++++++
5 files changed, 287 insertions(+), 4 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
index 960921c9e93..1badb2383d4 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
@@ -134,10 +134,11 @@ public class CoordinatorSegmentMetadataCache extends
AbstractSegmentMetadataCach
for (String dataSource : dataSourcesToRebuild) {
final RowSignature rowSignature =
buildDataSourceRowSignature(dataSource);
if (rowSignature == null) {
- log.info("RowSignature null for dataSource [%s], implying it no longer
exists, all metadata removed.", dataSource);
+ log.info("RowSignature null for dataSource [%s], implying that it no
longer exists. All metadata removed.", dataSource);
tables.remove(dataSource);
- return;
+ continue;
}
+
DataSourceInformation druidTable = new DataSourceInformation(dataSource,
rowSignature);
final DataSourceInformation oldTable = tables.put(dataSource,
druidTable);
if (oldTable == null ||
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index 22a5d7a67c4..31176a17f19 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.segment.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -27,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.InternalQueryConfig;
+import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequences;
@@ -34,14 +36,21 @@ import
org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryLifecycle;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryResponse;
@@ -53,16 +62,19 @@ import
org.apache.druid.server.security.AllowAllAuthenticator;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.LinearShardSpec;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -361,6 +373,135 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size());
}
+ @Test
+ public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws
IOException, InterruptedException
+ {
+ CountDownLatch addSegmentLatch = new CountDownLatch(7);
+ CoordinatorSegmentMetadataCache schema = new
CoordinatorSegmentMetadataCache(
+ getQueryLifecycleFactory(walker),
+ serverView,
+ SEGMENT_CACHE_CONFIG_DEFAULT,
+ new NoopEscalator(),
+ new InternalQueryConfig(),
+ new NoopServiceEmitter()
+ )
+ {
+ @Override
+ public void addSegment(final DruidServerMetadata server, final
DataSegment segment)
+ {
+ super.addSegment(server, segment);
+ addSegmentLatch.countDown();
+ }
+
+ @Override
+ public void removeSegment(final DataSegment segment)
+ {
+ super.removeSegment(segment);
+ }
+
+ @Override
+ public void markDataSourceAsNeedRebuild(String datasource)
+ {
+ super.markDataSourceAsNeedRebuild(datasource);
+ markDataSourceLatch.countDown();
+ }
+
+ @Override
+ @VisibleForTesting
+ public void refresh(
+ final Set<SegmentId> segmentsToRefresh,
+ final Set<String> dataSourcesToRebuild) throws IOException
+ {
+ super.refresh(segmentsToRefresh, dataSourcesToRebuild);
+ }
+ };
+
+ schema.start();
+ schema.awaitInitialization();
+
+ final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas =
schema.getSegmentMetadataSnapshot();
+ List<DataSegment> segments = segmentMetadatas.values()
+ .stream()
+
.map(AvailableSegmentMetadata::getSegment)
+
.collect(Collectors.toList());
+ Assert.assertEquals(6, segments.size());
+
+ // verify that dim3 column isn't present in schema for datasource foo
+ DataSourceInformation fooDs = schema.getDatasource("foo");
+
Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().noneMatch("dim3"::equals));
+
+ // segments contains two segments with datasource "foo" and one with
datasource "foo2"
+ // let's remove the only segment with datasource "foo2"
+ final DataSegment segmentToRemove = segments.stream()
+ .filter(segment ->
segment.getDataSource().equals("foo2"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(segmentToRemove);
+ schema.removeSegment(segmentToRemove);
+
+ // we will add a segment to another datasource and
+ // check if columns in this segment is reflected in the datasource schema
+ DataSegment newSegment =
+ DataSegment.builder()
+ .dataSource(DATASOURCE1)
+ .interval(Intervals.of("2002/P1Y"))
+ .version("1")
+ .shardSpec(new LinearShardSpec(0))
+ .size(0)
+ .build();
+
+ final File tmpDir = temporaryFolder.newFolder();
+
+ List<InputRow> rows = ImmutableList.of(
+ createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "",
"dim3", "c1")),
+ createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1",
"10.1", "dim3", "c2")),
+ createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2",
"dim3", "c3"))
+ );
+
+ QueryableIndex index = IndexBuilder.create()
+ .tmpDir(new File(tmpDir, "1"))
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ new IncrementalIndexSchema.Builder()
+ .withMetrics(
+ new
CountAggregatorFactory("cnt"),
+ new
DoubleSumAggregatorFactory("m1", "m1"),
+ new
HyperUniquesAggregatorFactory("unique_dim1", "dim1")
+ )
+ .withRollup(false)
+ .build()
+ )
+ .rows(rows)
+ .buildMMappedIndex();
+
+ walker.add(newSegment, index);
+ serverView.addSegment(newSegment, ServerType.HISTORICAL);
+
+ Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
+
+ Set<String> dataSources =
segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet());
+ dataSources.remove("foo2");
+
+ // LinkedHashSet to ensure we encounter the remove datasource first
+ Set<String> dataSourcesToRefresh = new LinkedHashSet<>();
+ dataSourcesToRefresh.add("foo2");
+ dataSourcesToRefresh.addAll(dataSources);
+
+ segments = schema.getSegmentMetadataSnapshot().values()
+ .stream()
+ .map(AvailableSegmentMetadata::getSegment)
+ .collect(Collectors.toList());
+
+
schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()),
dataSourcesToRefresh);
+ Assert.assertEquals(6, schema.getSegmentMetadataSnapshot().size());
+
+ fooDs = schema.getDatasource("foo");
+
+ // check if the new column present in the added segment is present in the
datasource schema
+ // ensuring that the schema is rebuilt
+
Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim3"::equals));
+ }
+
@Test
public void testNullAvailableSegmentMetadata() throws IOException,
InterruptedException
{
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java
index d421a15e35f..fb7b87580e1 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java
@@ -286,7 +286,7 @@ public abstract class SegmentMetadataCacheCommon extends
InitializedNullHandling
resourceCloser.close();
}
- InputRow createRow(final ImmutableMap<String, ?> map)
+ public InputRow createRow(final ImmutableMap<String, ?> map)
{
return MapInputRowParser.parse(FOO_SCHEMA, (Map<String, Object>) map);
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
index a52c6d89259..21c4a50996b 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
@@ -201,7 +201,7 @@ public class BrokerSegmentMetadataCache extends
AbstractSegmentMetadataCache<Phy
if (rowSignature == null) {
log.info("datasource [%s] no longer exists, all metadata removed.",
dataSource);
tables.remove(dataSource);
- return;
+ continue;
}
final PhysicalDatasourceMetadata physicalDatasourceMetadata =
dataSourceMetadataFactory.build(dataSource, rowSignature);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
index 3cc566cfc1b..dd29082b858 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
+import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequences;
@@ -44,15 +45,22 @@ import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.metadata.AbstractSegmentMetadataCache;
import org.apache.druid.segment.metadata.AvailableSegmentMetadata;
import org.apache.druid.segment.metadata.DataSourceInformation;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryLifecycle;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryResponse;
@@ -67,6 +75,7 @@ import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.junit.After;
@@ -76,12 +85,14 @@ import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -575,6 +586,136 @@ public class BrokerSegmentMetadataCacheTest extends
BrokerSegmentMetadataCacheCo
Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size());
}
+ @Test
+ public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws
IOException, InterruptedException
+ {
+ CountDownLatch addSegmentLatch = new CountDownLatch(7);
+ BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
+ CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
+ serverView,
+ SEGMENT_CACHE_CONFIG_DEFAULT,
+ new NoopEscalator(),
+ new InternalQueryConfig(),
+ new NoopServiceEmitter(),
+ new PhysicalDatasourceMetadataFactory(globalTableJoinable,
segmentManager),
+ new NoopCoordinatorClient()
+ )
+ {
+ @Override
+ public void addSegment(final DruidServerMetadata server, final
DataSegment segment)
+ {
+ super.addSegment(server, segment);
+ addSegmentLatch.countDown();
+ }
+
+ @Override
+ public void removeSegment(final DataSegment segment)
+ {
+ super.removeSegment(segment);
+ }
+
+ @Override
+ public void markDataSourceAsNeedRebuild(String datasource)
+ {
+ super.markDataSourceAsNeedRebuild(datasource);
+ }
+
+ @Override
+ @VisibleForTesting
+ public void refresh(
+ final Set<SegmentId> segmentsToRefresh,
+ final Set<String> dataSourcesToRebuild) throws IOException
+ {
+ super.refresh(segmentsToRefresh, dataSourcesToRebuild);
+ }
+ };
+
+ schema.start();
+ schema.awaitInitialization();
+
+ final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas =
schema.getSegmentMetadataSnapshot();
+ List<DataSegment> segments = segmentMetadatas.values()
+ .stream()
+
.map(AvailableSegmentMetadata::getSegment)
+
.collect(Collectors.toList());
+ Assert.assertEquals(6, segments.size());
+
+ // verify that dim3 column isn't present in the schema for foo
+ DatasourceTable.PhysicalDatasourceMetadata fooDs =
schema.getDatasource("foo");
+
Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().noneMatch("dim3"::equals));
+
+ // segments contains two segments with datasource "foo" and one with
datasource "foo2"
+ // let's remove the only segment with datasource "foo2"
+ final DataSegment segmentToRemove = segments.stream()
+ .filter(segment ->
segment.getDataSource().equals("foo2"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(segmentToRemove);
+ schema.removeSegment(segmentToRemove);
+
+ // we will add a segment to another datasource and
+ // check if columns in this segment is reflected in the datasource schema
+ DataSegment newSegment =
+ DataSegment.builder()
+ .dataSource("foo")
+ .interval(Intervals.of("2002/P1Y"))
+ .version("1")
+ .shardSpec(new LinearShardSpec(0))
+ .size(0)
+ .build();
+
+ final File tmpDir = temporaryFolder.newFolder();
+
+ List<InputRow> rows = ImmutableList.of(
+ createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "",
"dim3", "c1")),
+ createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1",
"10.1", "dim3", "c2")),
+ createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2",
"dim3", "c3"))
+ );
+
+ QueryableIndex index = IndexBuilder.create()
+ .tmpDir(new File(tmpDir, "1"))
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ new
IncrementalIndexSchema.Builder()
+ .withMetrics(
+ new
CountAggregatorFactory("cnt"),
+ new
DoubleSumAggregatorFactory("m1", "m1"),
+ new
HyperUniquesAggregatorFactory("unique_dim1", "dim1")
+ )
+ .withRollup(false)
+ .build()
+ )
+ .rows(rows)
+ .buildMMappedIndex();
+
+ walker.add(newSegment, index);
+ serverView.addSegment(newSegment, ServerType.HISTORICAL);
+
+ Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
+
+ Set<String> dataSources =
segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet());
+ dataSources.remove("foo2");
+
+ // LinkedHashSet to ensure that the datasource with no segments is
encountered first
+ Set<String> dataSourcesToRefresh = new LinkedHashSet<>();
+ dataSourcesToRefresh.add("foo2");
+ dataSourcesToRefresh.addAll(dataSources);
+
+ segments = schema.getSegmentMetadataSnapshot().values()
+ .stream()
+ .map(AvailableSegmentMetadata::getSegment)
+ .collect(Collectors.toList());
+
+
schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()),
dataSourcesToRefresh);
+ Assert.assertEquals(6, schema.getSegmentMetadataSnapshot().size());
+
+ fooDs = schema.getDatasource("foo");
+
+ // check if the new column present in the added segment is present in the
datasource schema
+ // ensuring that the schema is rebuilt
+
Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim3"::equals));
+ }
+
@Test
public void testNullAvailableSegmentMetadata() throws IOException,
InterruptedException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]