This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a8c06e93aaf Skip tombstone segment refresh in metadata cache (#17025)
a8c06e93aaf is described below

commit a8c06e93aafb3fce9ce9d62b657da78286b61307
Author: Rishabh Singh <[email protected]>
AuthorDate: Fri Sep 13 11:47:11 2024 +0530

    Skip tombstone segment refresh in metadata cache (#17025)
    
    This PR #16890 introduced a change to skip adding tombstone segments to the 
cache.
    It turns out that as a side effect tombstone segments appear unavailable in 
the console. This happens because availability of a segment in Broker is 
determined from the metadata cache.
    
    The fix is to keep the segment in the metadata cache but skip them from 
refresh.
    
    This doesn't affect any functionality as metadata query for tombstone 
returns empty causing continuous refresh of those segments.
---
 .../metadata/AbstractSegmentMetadataCache.java     |  24 ++--
 .../metadata/CoordinatorSegmentMetadataCache.java  |  50 ++++---
 .../CoordinatorSegmentMetadataCacheTest.java       | 155 ++++++++++++++-------
 .../schema/BrokerSegmentMetadataCacheTest.java     | 141 ++++++++++++-------
 4 files changed, 241 insertions(+), 129 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
index d918ec5e3f2..99d965ec643 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
@@ -102,6 +102,13 @@ import java.util.stream.StreamSupport;
  * <p>
  * This class has an abstract method {@link #refresh(Set, Set)} which the 
child class must override
  * with the logic to build and cache table schema.
+ * <p>
+ * Note on handling tombstone segments:
+ * These segments lack data or column information.
+ * Additionally, segment metadata queries, which are not yet implemented for 
tombstone segments
+ * (see: https://github.com/apache/druid/pull/12137) do not provide metadata 
for tombstones,
+ * leading to indefinite refresh attempts for these segments.
+ * Therefore, these segments are never added to the set of segments being 
refreshed.
  *
  * @param <T> The type of information associated with the data source, which 
must extend {@link DataSourceInformation}.
  */
@@ -478,13 +485,6 @@ public abstract class AbstractSegmentMetadataCache<T 
extends DataSourceInformati
   @VisibleForTesting
   public void addSegment(final DruidServerMetadata server, final DataSegment 
segment)
   {
-    // Skip adding tombstone segment to the cache. These segments lack data or 
column information.
-    // Additionally, segment metadata queries, which are not yet implemented 
for tombstone segments
-    // (see: https://github.com/apache/druid/pull/12137) do not provide 
metadata for tombstones,
-    // leading to indefinite refresh attempts for these segments.
-    if (segment.isTombstone()) {
-      return;
-    }
     // Get lock first so that we won't wait in ConcurrentMap.compute().
     synchronized (lock) {
       // someday we could hypothetically remove broker special casing, 
whenever BrokerServerView supports tracking
@@ -511,7 +511,11 @@ public abstract class AbstractSegmentMetadataCache<T 
extends DataSourceInformati
                       segmentMetadata = AvailableSegmentMetadata
                           .builder(segment, isRealtime, 
ImmutableSet.of(server), null, DEFAULT_NUM_ROWS) // Added without needing a 
refresh
                           .build();
-                      markSegmentAsNeedRefresh(segment.getId());
+                      if (segment.isTombstone()) {
+                        log.debug("Skipping refresh for tombstone segment.");
+                      } else {
+                        markSegmentAsNeedRefresh(segment.getId());
+                      }
                       if (!server.isSegmentReplicationTarget()) {
                         log.debug("Added new mutable segment [%s].", 
segment.getId());
                         markSegmentAsMutable(segment.getId());
@@ -557,10 +561,6 @@ public abstract class AbstractSegmentMetadataCache<T 
extends DataSourceInformati
   @VisibleForTesting
   public void removeSegment(final DataSegment segment)
   {
-    // tombstone segments are not present in the cache
-    if (segment.isTombstone()) {
-      return;
-    }
     // Get lock first so that we won't wait in ConcurrentMap.compute().
     synchronized (lock) {
       log.debug("Segment [%s] is gone.", segment.getId());
diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
index 321c33fa1db..24489e60acd 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
@@ -374,9 +374,7 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
                                                
.withNumRows(metadata.get().getNumRows())
                                                .build();
               } else {
-                // mark it for refresh, however, this case shouldn't arise by 
design
-                markSegmentAsNeedRefresh(segmentId);
-                log.debug("SchemaMetadata for segmentId[%s] is absent.", 
segmentId);
+                
markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
                 return availableSegmentMetadata;
               }
             }
@@ -403,9 +401,7 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
                                        
.withNumRows(metadata.get().getNumRows())
                                        .build();
     } else {
-      // mark it for refresh, however, this case shouldn't arise by design
-      markSegmentAsNeedRefresh(segmentId);
-      log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);
+      markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
     }
     return availableSegmentMetadata;
   }
@@ -686,22 +682,14 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
     final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
 
     if (segmentsMap != null && !segmentsMap.isEmpty()) {
-      for (SegmentId segmentId : segmentsMap.keySet()) {
+      for (Map.Entry<SegmentId, AvailableSegmentMetadata> entry : 
segmentsMap.entrySet()) {
+        SegmentId segmentId = entry.getKey();
         Optional<SchemaPayloadPlus> optionalSchema = 
segmentSchemaCache.getSchemaForSegment(segmentId);
         if (optionalSchema.isPresent()) {
           RowSignature rowSignature = 
optionalSchema.get().getSchemaPayload().getRowSignature();
           mergeRowSignature(columnTypes, rowSignature);
         } else {
-          log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);
-
-          ImmutableDruidDataSource druidDataSource =
-              
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource());
-
-          if (druidDataSource != null && druidDataSource.getSegment(segmentId) 
!= null) {
-            // mark it for refresh only if it is used
-            // however, this case shouldn't arise by design
-            markSegmentAsNeedRefresh(segmentId);
-          }
+          markSegmentForRefreshIfNeeded(entry.getValue().getSegment());
         }
       }
     } else {
@@ -876,4 +864,32 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
       return Optional.empty();
     }
   }
+
+  /**
+   * A segment schema can go missing. To ensure smooth functioning, segment is 
marked for refresh.
+   * It need not be refreshed in the following scenarios:
+   * - Tombstone segment, since they do not have any schema.
+   * - Unused segment which hasn't been yet removed from the cache.
+   * Any other scenario needs investigation.
+   */
+  private void markSegmentForRefreshIfNeeded(DataSegment segment)
+  {
+    SegmentId id = segment.getId();
+
+    log.debug("SchemaMetadata for segmentId [%s] is absent.", id);
+
+    if (segment.isTombstone()) {
+      log.debug("Skipping refresh for tombstone segment [%s].", id);
+      return;
+    }
+
+    ImmutableDruidDataSource druidDataSource =
+        
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segment.getDataSource());
+
+    if (druidDataSource != null && druidDataSource.getSegment(id) != null) {
+      markSegmentAsNeedRefresh(id);
+    } else {
+      log.debug("Skipping refresh for unused segment [%s].", id);
+    }
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index 0c099cb551c..22b0890e855 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.client.InternalQueryConfig;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
@@ -2220,74 +2221,109 @@ public class CoordinatorSegmentMetadataCacheTest 
extends CoordinatorSegmentMetad
   }
 
   @Test
-  public void testTombstoneSegmentIsNotAdded() throws InterruptedException
+  public void testTombstoneSegmentIsNotRefreshed() throws IOException
   {
-    String datasource = "newSegmentAddTest";
-    CountDownLatch addSegmentLatch = new CountDownLatch(1);
+    String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} 
}";
+
+    TestHelper.makeJsonMapper();
+    InternalQueryConfig internalQueryConfig = MAPPER.readValue(
+        MAPPER.writeValueAsString(
+            MAPPER.readValue(brokerInternalQueryConfigJson, 
InternalQueryConfig.class)
+        ),
+        InternalQueryConfig.class
+    );
+
+    QueryLifecycleFactory factoryMock = 
EasyMock.createMock(QueryLifecycleFactory.class);
+    QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class);
 
     CoordinatorSegmentMetadataCache schema = new 
CoordinatorSegmentMetadataCache(
-        getQueryLifecycleFactory(walker),
+        factoryMock,
         serverView,
         SEGMENT_CACHE_CONFIG_DEFAULT,
         new NoopEscalator(),
-        new InternalQueryConfig(),
+        internalQueryConfig,
         new NoopServiceEmitter(),
         segmentSchemaCache,
         backFillQueue,
         sqlSegmentsMetadataManager,
         segmentsMetadataManagerConfigSupplier
-    )
-    {
-      @Override
-      public void addSegment(final DruidServerMetadata server, final 
DataSegment segment)
-      {
-        super.addSegment(server, segment);
-        if (datasource.equals(segment.getDataSource())) {
-          addSegmentLatch.countDown();
-        }
-      }
-    };
+    );
 
-    schema.onLeaderStart();
-    schema.awaitInitialization();
+    Map<String, Object> queryContext = ImmutableMap.of(
+        QueryContexts.PRIORITY_KEY, 5,
+        QueryContexts.BROKER_PARALLEL_MERGE_KEY, false
+    );
 
-    DataSegment segment = new DataSegment(
-        datasource,
-        Intervals.of("2001/2002"),
-        "1",
-        Collections.emptyMap(),
-        Collections.emptyList(),
-        Collections.emptyList(),
-        TombstoneShardSpec.INSTANCE,
-        null,
+    DataSegment segment = newSegment("test", 0);
+    DataSegment tombstone = DataSegment.builder()
+                                       .dataSource("test")
+                                       
.interval(Intervals.of("2012-01-01/2012-01-02"))
+                                       
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
+                                       .shardSpec(new TombstoneShardSpec())
+                                       .loadSpec(Collections.singletonMap(
+                                           "type",
+                                           DataSegment.TOMBSTONE_LOADSPEC_TYPE
+                                       ))
+                                       .size(0)
+                                       .build();
+
+    final DruidServer historicalServer = druidServers.stream()
+                                                     .filter(s -> 
s.getType().equals(ServerType.HISTORICAL))
+                                                     .findAny()
+                                                     .orElse(null);
+
+    Assert.assertNotNull(historicalServer);
+    final DruidServerMetadata historicalServerMetadata = 
historicalServer.getMetadata();
+
+    schema.addSegment(historicalServerMetadata, segment);
+    schema.addSegment(historicalServerMetadata, tombstone);
+    
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
+
+    List<SegmentId> segmentIterable = ImmutableList.of(segment.getId(), 
tombstone.getId());
+
+    SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery(
+        new TableDataSource(segment.getDataSource()),
+        new MultipleSpecificSegmentSpec(
+            segmentIterable.stream()
+                           .filter(id -> !id.equals(tombstone.getId()))
+                           .map(SegmentId::toDescriptor)
+                           .collect(Collectors.toList())
+        ),
+        new AllColumnIncluderator(),
+        false,
+        queryContext,
+        EnumSet.of(SegmentMetadataQuery.AnalysisType.AGGREGATORS),
+        false,
         null,
-        0
+        null
     );
 
-    Assert.assertEquals(6, schema.getTotalSegments());
+    EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
+    EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, 
AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK))
+            
.andReturn(QueryResponse.withEmptyContext(Sequences.empty())).once();
 
-    serverView.addSegment(segment, ServerType.HISTORICAL);
-    Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
-    Assert.assertEquals(0, addSegmentLatch.getCount());
+    EasyMock.replay(factoryMock, lifecycleMock);
 
-    Assert.assertEquals(6, schema.getTotalSegments());
-    List<AvailableSegmentMetadata> metadatas = schema
-        .getSegmentMetadataSnapshot()
-        .values()
-        .stream()
-        .filter(metadata -> 
datasource.equals(metadata.getSegment().getDataSource()))
-        .collect(Collectors.toList());
-    Assert.assertEquals(0, metadatas.size());
+    schema.refresh(Collections.singleton(segment.getId()), 
Collections.singleton("test"));
 
-    serverView.removeSegment(segment, ServerType.HISTORICAL);
-    Assert.assertEquals(6, schema.getTotalSegments());
-    metadatas = schema
-        .getSegmentMetadataSnapshot()
-        .values()
-        .stream()
-        .filter(metadata -> 
datasource.equals(metadata.getSegment().getDataSource()))
-        .collect(Collectors.toList());
-    Assert.assertEquals(0, metadatas.size());
+    // verify that metadata query is not issued for tombstone segment
+    EasyMock.verify(factoryMock, lifecycleMock);
+
+    // Verify that datasource schema building logic doesn't mark the tombstone 
segment for refresh
+    
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
+
+    AvailableSegmentMetadata availableSegmentMetadata = 
schema.getAvailableSegmentMetadata("test", tombstone.getId());
+    Assert.assertNotNull(availableSegmentMetadata);
+    // fetching metadata for tombstone segment shouldn't mark it for refresh
+    
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
+
+    Set<AvailableSegmentMetadata> metadatas = new HashSet<>();
+    schema.iterateSegmentMetadata().forEachRemaining(metadatas::add);
+
+    Assert.assertEquals(1, metadatas.stream().filter(metadata -> 
metadata.getSegment().isTombstone()).count());
+
+    // iterating over entire metadata doesn't cause tombstone to be marked for 
refresh
+    
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
   }
 
   @Test
@@ -2384,6 +2420,27 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
 
     
Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId()));
     
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId()));
+
+    AvailableSegmentMetadata availableSegmentMetadata =
+        schema.getAvailableSegmentMetadata(dataSource, 
segments.get(0).getId());
+
+    Assert.assertNotNull(availableSegmentMetadata);
+    // fetching metadata for unused segment shouldn't mark it for refresh
+    
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId()));
+
+    Set<AvailableSegmentMetadata> metadatas = new HashSet<>();
+    schema.iterateSegmentMetadata().forEachRemaining(metadatas::add);
+
+    Assert.assertEquals(
+        1,
+        metadatas.stream()
+                 .filter(
+                     metadata ->
+                         
metadata.getSegment().getId().equals(segments.get(0).getId())).count()
+    );
+
+    // iterating over entire metadata doesn't cause unsed segment to be marked 
for refresh
+    
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId()));
   }
 
   private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int 
columns)
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
index d9b24ed011d..b613c602f63 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.client.InternalQueryConfig;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.client.coordinator.NoopCoordinatorClient;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.guava.Sequences;
@@ -1139,71 +1140,109 @@ public class BrokerSegmentMetadataCacheTest extends 
BrokerSegmentMetadataCacheTe
   }
 
   @Test
-  public void testTombstoneSegmentIsNotAdded() throws InterruptedException
+  public void testTombstoneSegmentIsNotRefreshed() throws IOException
   {
-    String datasource = "newSegmentAddTest";
-    CountDownLatch addSegmentLatch = new CountDownLatch(1);
+    String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} 
}";
+
+    TestHelper.makeJsonMapper();
+    InternalQueryConfig internalQueryConfig = MAPPER.readValue(
+        MAPPER.writeValueAsString(
+            MAPPER.readValue(brokerInternalQueryConfigJson, 
InternalQueryConfig.class)
+        ),
+        InternalQueryConfig.class
+    );
+
+    QueryLifecycleFactory factoryMock = 
EasyMock.createMock(QueryLifecycleFactory.class);
+    QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class);
+
     BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
-        CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
+        factoryMock,
         serverView,
-        BrokerSegmentMetadataCacheConfig.create(),
+        SEGMENT_CACHE_CONFIG_DEFAULT,
         new NoopEscalator(),
-        new InternalQueryConfig(),
+        internalQueryConfig,
         new NoopServiceEmitter(),
         new PhysicalDatasourceMetadataFactory(globalTableJoinable, 
segmentManager),
         new NoopCoordinatorClient(),
         CentralizedDatasourceSchemaConfig.create()
-    )
-    {
-      @Override
-      public void addSegment(final DruidServerMetadata server, final 
DataSegment segment)
-      {
-        super.addSegment(server, segment);
-        if (datasource.equals(segment.getDataSource())) {
-          addSegmentLatch.countDown();
-        }
-      }
-    };
+    );
 
-    schema.start();
-    schema.awaitInitialization();
+    Map<String, Object> queryContext = ImmutableMap.of(
+        QueryContexts.PRIORITY_KEY, 5,
+        QueryContexts.BROKER_PARALLEL_MERGE_KEY, false
+    );
 
-    DataSegment segment = new DataSegment(
-        datasource,
-        Intervals.of("2001/2002"),
-        "1",
-        Collections.emptyMap(),
-        Collections.emptyList(),
-        Collections.emptyList(),
-        TombstoneShardSpec.INSTANCE,
-        null,
+    DataSegment segment = newSegment("test", 0);
+    DataSegment tombstone = DataSegment.builder()
+                                       .dataSource("test")
+                                       
.interval(Intervals.of("2012-01-01/2012-01-02"))
+                                       
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
+                                       .shardSpec(new TombstoneShardSpec())
+                                       .loadSpec(Collections.singletonMap(
+                                           "type",
+                                           DataSegment.TOMBSTONE_LOADSPEC_TYPE
+                                       ))
+                                       .size(0)
+                                       .build();
+
+    final ImmutableDruidServer historicalServer = druidServers.stream()
+                                                     .filter(s -> 
s.getType().equals(ServerType.HISTORICAL))
+                                                     .findAny()
+                                                     .orElse(null);
+
+    Assert.assertNotNull(historicalServer);
+    final DruidServerMetadata historicalServerMetadata = 
historicalServer.getMetadata();
+
+    schema.addSegment(historicalServerMetadata, segment);
+    schema.addSegment(historicalServerMetadata, tombstone);
+    
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
+
+    List<SegmentId> segmentIterable = ImmutableList.of(segment.getId(), 
tombstone.getId());
+
+    SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery(
+        new TableDataSource(segment.getDataSource()),
+        new MultipleSpecificSegmentSpec(
+            segmentIterable.stream()
+                           .filter(id -> !id.equals(tombstone.getId()))
+                           .map(SegmentId::toDescriptor)
+                           .collect(Collectors.toList())
+        ),
+        new AllColumnIncluderator(),
+        false,
+        queryContext,
+        EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
+        false,
         null,
-        0
+        null
     );
 
-    Assert.assertEquals(6, schema.getTotalSegments());
+    EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
+    EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, 
AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK))
+            .andReturn(QueryResponse.withEmptyContext(Sequences.empty()));
 
-    serverView.addSegment(segment, ServerType.HISTORICAL);
-    Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
-    Assert.assertEquals(0, addSegmentLatch.getCount());
+    EasyMock.replay(factoryMock, lifecycleMock);
 
-    Assert.assertEquals(6, schema.getTotalSegments());
-    List<AvailableSegmentMetadata> metadatas = schema
-        .getSegmentMetadataSnapshot()
-        .values()
-        .stream()
-        .filter(metadata -> 
datasource.equals(metadata.getSegment().getDataSource()))
-        .collect(Collectors.toList());
-    Assert.assertEquals(0, metadatas.size());
-
-    serverView.removeSegment(segment, ServerType.HISTORICAL);
-    Assert.assertEquals(6, schema.getTotalSegments());
-    metadatas = schema
-        .getSegmentMetadataSnapshot()
-        .values()
-        .stream()
-        .filter(metadata -> 
datasource.equals(metadata.getSegment().getDataSource()))
-        .collect(Collectors.toList());
-    Assert.assertEquals(0, metadatas.size());
+    Set<SegmentId> segmentsToRefresh = new HashSet<>();
+    segmentsToRefresh.add(segment.getId());
+    schema.refresh(segmentsToRefresh, Collections.singleton("test"));
+
+    // verify that metadata is not issued for tombstone segment
+    EasyMock.verify(factoryMock, lifecycleMock);
+
+    // Verify that datasource schema building logic doesn't mark the tombstone 
segment for refresh
+    
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
+
+    AvailableSegmentMetadata availableSegmentMetadata = 
schema.getAvailableSegmentMetadata("test", tombstone.getId());
+    Assert.assertNotNull(availableSegmentMetadata);
+    // fetching metadata for tombstone segment shouldn't mark it for refresh
+    
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
+
+    Set<AvailableSegmentMetadata> metadatas = new HashSet<>();
+    schema.iterateSegmentMetadata().forEachRemaining(metadatas::add);
+
+    Assert.assertEquals(1, metadatas.stream().filter(metadata -> 
metadata.getSegment().isTombstone()).count());
+
+    // iterating over entire metadata doesn't cause tombstone to be marked for 
refresh
+    
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to