This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5446494e63c Non-existent datasource shouldn't affect schema rebuilding 
for other datasources (#15355)
5446494e63c is described below

commit 5446494e63c92986cbca9a7e34aad8e94b85daf0
Author: Rishabh Singh <[email protected]>
AuthorDate: Tue Nov 14 12:52:33 2023 +0530

    Non-existent datasource shouldn't affect schema rebuilding for other 
datasources (#15355)
    
    In pull request #14985, a bug was introduced where periodic refresh would 
skip rebuilding a datasource's schema after encountering a non-existent 
datasource. This resulted in remaining datasources having stale schema 
information.
    
    This change addresses the bug and adds a unit test to validate the refresh 
mechanism's behaviour when a datasource is removed, and other datasources have 
schema changes.
---
 .../metadata/CoordinatorSegmentMetadataCache.java  |   5 +-
 .../CoordinatorSegmentMetadataCacheTest.java       | 141 +++++++++++++++++++++
 .../metadata/SegmentMetadataCacheCommon.java       |   2 +-
 .../calcite/schema/BrokerSegmentMetadataCache.java |   2 +-
 .../schema/BrokerSegmentMetadataCacheTest.java     | 141 +++++++++++++++++++++
 5 files changed, 287 insertions(+), 4 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
index 960921c9e93..1badb2383d4 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
@@ -134,10 +134,11 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
     for (String dataSource : dataSourcesToRebuild) {
       final RowSignature rowSignature = 
buildDataSourceRowSignature(dataSource);
       if (rowSignature == null) {
-        log.info("RowSignature null for dataSource [%s], implying it no longer 
exists, all metadata removed.", dataSource);
+        log.info("RowSignature null for dataSource [%s], implying that it no 
longer exists. All metadata removed.", dataSource);
         tables.remove(dataSource);
-        return;
+        continue;
       }
+
       DataSourceInformation druidTable = new DataSourceInformation(dataSource, 
rowSignature);
       final DataSourceInformation oldTable = tables.put(dataSource, 
druidTable);
       if (oldTable == null || 
!oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index 22a5d7a67c4..31176a17f19 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.metadata;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -27,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.InternalQueryConfig;
+import org.apache.druid.data.input.InputRow;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.guava.Sequences;
@@ -34,14 +36,21 @@ import 
org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import 
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
 import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
 import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
 import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
 import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.QueryLifecycle;
 import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.QueryResponse;
@@ -53,16 +62,19 @@ import 
org.apache.druid.server.security.AllowAllAuthenticator;
 import org.apache.druid.server.security.NoopEscalator;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -361,6 +373,135 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
     Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size());
   }
 
+  @Test
+  public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws 
IOException, InterruptedException
+  {
+    CountDownLatch addSegmentLatch = new CountDownLatch(7);
+    CoordinatorSegmentMetadataCache schema = new 
CoordinatorSegmentMetadataCache(
+        getQueryLifecycleFactory(walker),
+        serverView,
+        SEGMENT_CACHE_CONFIG_DEFAULT,
+        new NoopEscalator(),
+        new InternalQueryConfig(),
+        new NoopServiceEmitter()
+    )
+    {
+      @Override
+      public void addSegment(final DruidServerMetadata server, final 
DataSegment segment)
+      {
+        super.addSegment(server, segment);
+        addSegmentLatch.countDown();
+      }
+
+      @Override
+      public void removeSegment(final DataSegment segment)
+      {
+        super.removeSegment(segment);
+      }
+
+      @Override
+      public void markDataSourceAsNeedRebuild(String datasource)
+      {
+        super.markDataSourceAsNeedRebuild(datasource);
+        markDataSourceLatch.countDown();
+      }
+
+      @Override
+      @VisibleForTesting
+      public void refresh(
+          final Set<SegmentId> segmentsToRefresh,
+          final Set<String> dataSourcesToRebuild) throws IOException
+      {
+        super.refresh(segmentsToRefresh, dataSourcesToRebuild);
+      }
+    };
+
+    schema.start();
+    schema.awaitInitialization();
+
+    final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas = 
schema.getSegmentMetadataSnapshot();
+    List<DataSegment> segments = segmentMetadatas.values()
+                                                       .stream()
+                                                       
.map(AvailableSegmentMetadata::getSegment)
+                                                       
.collect(Collectors.toList());
+    Assert.assertEquals(6, segments.size());
+
+    // verify that dim3 column isn't present in schema for datasource foo
+    DataSourceInformation fooDs = schema.getDatasource("foo");
+    
Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().noneMatch("dim3"::equals));
+
+    // segments contains two segments with datasource "foo" and one with 
datasource "foo2"
+    // let's remove the only segment with datasource "foo2"
+    final DataSegment segmentToRemove = segments.stream()
+                                                .filter(segment -> 
segment.getDataSource().equals("foo2"))
+                                                .findFirst()
+                                                .orElse(null);
+    Assert.assertNotNull(segmentToRemove);
+    schema.removeSegment(segmentToRemove);
+
+    // we will add a segment to another datasource and
+    // check if columns in this segment is reflected in the datasource schema
+    DataSegment newSegment =
+        DataSegment.builder()
+                   .dataSource(DATASOURCE1)
+                   .interval(Intervals.of("2002/P1Y"))
+                   .version("1")
+                   .shardSpec(new LinearShardSpec(0))
+                   .size(0)
+                   .build();
+
+    final File tmpDir = temporaryFolder.newFolder();
+
+    List<InputRow> rows = ImmutableList.of(
+        createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "", 
"dim3", "c1")),
+        createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1", 
"10.1", "dim3", "c2")),
+        createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2", 
"dim3", "c3"))
+    );
+
+    QueryableIndex index = IndexBuilder.create()
+                                       .tmpDir(new File(tmpDir, "1"))
+                                       
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                                       .schema(
+                                           new IncrementalIndexSchema.Builder()
+                                               .withMetrics(
+                                                   new 
CountAggregatorFactory("cnt"),
+                                                   new 
DoubleSumAggregatorFactory("m1", "m1"),
+                                                   new 
HyperUniquesAggregatorFactory("unique_dim1", "dim1")
+                                               )
+                                               .withRollup(false)
+                                               .build()
+                                       )
+                                       .rows(rows)
+                                       .buildMMappedIndex();
+
+    walker.add(newSegment, index);
+    serverView.addSegment(newSegment, ServerType.HISTORICAL);
+
+    Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
+
+    Set<String> dataSources = 
segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet());
+    dataSources.remove("foo2");
+
+    // LinkedHashSet to ensure we encounter the remove datasource first
+    Set<String> dataSourcesToRefresh = new LinkedHashSet<>();
+    dataSourcesToRefresh.add("foo2");
+    dataSourcesToRefresh.addAll(dataSources);
+
+    segments = schema.getSegmentMetadataSnapshot().values()
+                    .stream()
+                    .map(AvailableSegmentMetadata::getSegment)
+                    .collect(Collectors.toList());
+
+    
schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()),
 dataSourcesToRefresh);
+    Assert.assertEquals(6, schema.getSegmentMetadataSnapshot().size());
+
+    fooDs = schema.getDatasource("foo");
+
+    // check if the new column present in the added segment is present in the 
datasource schema
+    // ensuring that the schema is rebuilt
+    
Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim3"::equals));
+  }
+
   @Test
   public void testNullAvailableSegmentMetadata() throws IOException, 
InterruptedException
   {
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java
index d421a15e35f..fb7b87580e1 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java
@@ -286,7 +286,7 @@ public abstract class SegmentMetadataCacheCommon extends 
InitializedNullHandling
     resourceCloser.close();
   }
 
-  InputRow createRow(final ImmutableMap<String, ?> map)
+  public InputRow createRow(final ImmutableMap<String, ?> map)
   {
     return MapInputRowParser.parse(FOO_SCHEMA, (Map<String, Object>) map);
   }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
index a52c6d89259..21c4a50996b 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
@@ -201,7 +201,7 @@ public class BrokerSegmentMetadataCache extends 
AbstractSegmentMetadataCache<Phy
       if (rowSignature == null) {
         log.info("datasource [%s] no longer exists, all metadata removed.", 
dataSource);
         tables.remove(dataSource);
-        return;
+        continue;
       }
 
       final PhysicalDatasourceMetadata physicalDatasourceMetadata = 
dataSourceMetadataFactory.build(dataSource, rowSignature);
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
index 3cc566cfc1b..dd29082b858 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.client.InternalQueryConfig;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.client.coordinator.NoopCoordinatorClient;
+import org.apache.druid.data.input.InputRow;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.guava.Sequences;
@@ -44,15 +45,22 @@ import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.GlobalTableDataSource;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import 
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
 import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
 import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexStorageAdapter;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.metadata.AbstractSegmentMetadataCache;
 import org.apache.druid.segment.metadata.AvailableSegmentMetadata;
 import org.apache.druid.segment.metadata.DataSourceInformation;
+import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.QueryLifecycle;
 import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.QueryResponse;
@@ -67,6 +75,7 @@ import org.apache.druid.sql.calcite.table.DruidTable;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.junit.After;
@@ -76,12 +85,14 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -575,6 +586,136 @@ public class BrokerSegmentMetadataCacheTest extends 
BrokerSegmentMetadataCacheCo
     Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size());
   }
 
+  @Test
+  public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws 
IOException, InterruptedException
+  {
+    CountDownLatch addSegmentLatch = new CountDownLatch(7);
+    BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
+        CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
+        serverView,
+        SEGMENT_CACHE_CONFIG_DEFAULT,
+        new NoopEscalator(),
+        new InternalQueryConfig(),
+        new NoopServiceEmitter(),
+        new PhysicalDatasourceMetadataFactory(globalTableJoinable, 
segmentManager),
+        new NoopCoordinatorClient()
+    )
+    {
+      @Override
+      public void addSegment(final DruidServerMetadata server, final 
DataSegment segment)
+      {
+        super.addSegment(server, segment);
+        addSegmentLatch.countDown();
+      }
+
+      @Override
+      public void removeSegment(final DataSegment segment)
+      {
+        super.removeSegment(segment);
+      }
+
+      @Override
+      public void markDataSourceAsNeedRebuild(String datasource)
+      {
+        super.markDataSourceAsNeedRebuild(datasource);
+      }
+
+      @Override
+      @VisibleForTesting
+      public void refresh(
+          final Set<SegmentId> segmentsToRefresh,
+          final Set<String> dataSourcesToRebuild) throws IOException
+      {
+        super.refresh(segmentsToRefresh, dataSourcesToRebuild);
+      }
+    };
+
+    schema.start();
+    schema.awaitInitialization();
+
+    final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas = 
schema.getSegmentMetadataSnapshot();
+    List<DataSegment> segments = segmentMetadatas.values()
+                                                       .stream()
+                                                       
.map(AvailableSegmentMetadata::getSegment)
+                                                       
.collect(Collectors.toList());
+    Assert.assertEquals(6, segments.size());
+
+    // verify that dim3 column isn't present in the schema for foo
+    DatasourceTable.PhysicalDatasourceMetadata fooDs = 
schema.getDatasource("foo");
+    
Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().noneMatch("dim3"::equals));
+
+    // segments contains two segments with datasource "foo" and one with 
datasource "foo2"
+    // let's remove the only segment with datasource "foo2"
+    final DataSegment segmentToRemove = segments.stream()
+                                                .filter(segment -> 
segment.getDataSource().equals("foo2"))
+                                                .findFirst()
+                                                .orElse(null);
+    Assert.assertNotNull(segmentToRemove);
+    schema.removeSegment(segmentToRemove);
+
+    // we will add a segment to another datasource and
+    // check if columns in this segment is reflected in the datasource schema
+    DataSegment newSegment =
+        DataSegment.builder()
+                   .dataSource("foo")
+                   .interval(Intervals.of("2002/P1Y"))
+                   .version("1")
+                   .shardSpec(new LinearShardSpec(0))
+                   .size(0)
+                   .build();
+
+    final File tmpDir = temporaryFolder.newFolder();
+
+    List<InputRow> rows = ImmutableList.of(
+        createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "", 
"dim3", "c1")),
+        createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1", 
"10.1", "dim3", "c2")),
+        createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2", 
"dim3", "c3"))
+    );
+
+    QueryableIndex index = IndexBuilder.create()
+                                        .tmpDir(new File(tmpDir, "1"))
+                                        
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                                        .schema(
+                                            new 
IncrementalIndexSchema.Builder()
+                                                .withMetrics(
+                                                    new 
CountAggregatorFactory("cnt"),
+                                                    new 
DoubleSumAggregatorFactory("m1", "m1"),
+                                                    new 
HyperUniquesAggregatorFactory("unique_dim1", "dim1")
+                                                )
+                                                .withRollup(false)
+                                                .build()
+                                        )
+                                        .rows(rows)
+                                        .buildMMappedIndex();
+
+    walker.add(newSegment, index);
+    serverView.addSegment(newSegment, ServerType.HISTORICAL);
+
+    Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
+
+    Set<String> dataSources = 
segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet());
+    dataSources.remove("foo2");
+
+    // LinkedHashSet to ensure that the datasource with no segments is 
encountered first
+    Set<String> dataSourcesToRefresh = new LinkedHashSet<>();
+    dataSourcesToRefresh.add("foo2");
+    dataSourcesToRefresh.addAll(dataSources);
+
+    segments = schema.getSegmentMetadataSnapshot().values()
+                     .stream()
+                     .map(AvailableSegmentMetadata::getSegment)
+                     .collect(Collectors.toList());
+
+    
schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()),
 dataSourcesToRefresh);
+    Assert.assertEquals(6, schema.getSegmentMetadataSnapshot().size());
+
+    fooDs = schema.getDatasource("foo");
+
+    // check if the new column present in the added segment is present in the 
datasource schema
+    // ensuring that the schema is rebuilt
+    
Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim3"::equals));
+  }
+
   @Test
   public void testNullAvailableSegmentMetadata() throws IOException, 
InterruptedException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to