abhishekrb19 commented on code in PR #16044:
URL: https://github.com/apache/druid/pull/16044#discussion_r1516173728


##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -293,29 +292,32 @@ public void testPollPeriodicallyAndOnDemandInterleave() 
throws Exception
   @Test
   public void 
testPrepareImmutableDataSourceWithUsedSegmentsAwaitsPollOnRestart() throws 
IOException
   {
-    DataSegment newSegment = pollThenStopThenStartIntro();
+    publishWikiSegments();
+    DataSegment koalaSegment = pollThenStopThenStartIntro();

Review Comment:
   It isn't immediately obvious why a koala segment is being returned by 
`pollThenStopThenStartIntro`. In its implementation, I see the function 
publishes a segment for koala datasource in the end. I'm guessing that's what 
the "start intro"  in the function name denotes.  Should we rename the function 
to `pollThenStopThenPublishKoalaSegment` or break up the functionality?



##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -343,123 +346,122 @@ private DataSegment pollThenStopThenStartIntro() throws 
IOException
     sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
     
Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
     Assert.assertEquals(
-        ImmutableSet.of("wikipedia"),
+        ImmutableSet.of(DS.WIKI),
         sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
     );
-    DataSegment newSegment = createNewSegment1("wikipedia2");
-    publisher.publishSegment(newSegment);
+    final DataSegment koalaSegment = createNewSegment1(DS.KOALA);
+    publisher.publishSegment(koalaSegment);
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
-    return newSegment;
+    return koalaSegment;
   }
 
   @Test
-  public void testPollWithCorruptedSegment()
+  public void testPollWithCorruptedSegment() throws IOException
   {
+    publishWikiSegments();
+
     //create a corrupted segment entry in segments table, which tests
     //that overall loading of segments from database continues to work
     //even in one of the entries are corrupted.

Review Comment:
   Make this comment a javadoc and fix a typo:
   ```suggestion
   ```



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -347,70 +347,68 @@ public void populateUsedFlagLastUpdatedAsync()
   @VisibleForTesting
   void populateUsedFlagLastUpdated()
   {
-    String segmentsTable = getSegmentsTable();
+    final String segmentsTable = getSegmentsTable();
     log.info(
-        "Populating used_status_last_updated with non-NULL values for unused 
segments in [%s]",
+        "Populating column 'used_status_last_updated' with non-NULL values for 
unused segments in table[%s].",
         segmentsTable
     );
 
-    int limit = 100;
+    final int batchSize = 100;
     int totalUpdatedEntries = 0;
 
+    // Update the rows in batches of size 100
     while (true) {
-      List<String> segmentsToUpdate = new ArrayList<>(100);
+      final List<String> segmentsToUpdate = new ArrayList<>(batchSize);
+      int numUpdatedRows;
       try {
         connector.retryWithHandle(
-            new HandleCallback<Void>()
-            {
-              @Override
-              public Void withHandle(Handle handle)
-              {
-                segmentsToUpdate.addAll(handle.createQuery(
-                    StringUtils.format(
-                        "SELECT id FROM %1$s WHERE used_status_last_updated IS 
NULL and used = :used %2$s",
-                        segmentsTable,
-                        connector.limitClause(limit)
-                    )
-                ).bind("used", false).mapTo(String.class).list());
-                return null;
-              }
+            handle -> {
+              segmentsToUpdate.addAll(handle.createQuery(
+                  StringUtils.format(
+                      "SELECT id FROM %1$s WHERE used_status_last_updated IS 
NULL and used = :used %2$s",
+                      segmentsTable,
+                      connector.limitClause(batchSize)
+                  )
+              ).bind("used", false).mapTo(String.class).list());
+              return null;
             }
         );
 
         if (segmentsToUpdate.isEmpty()) {
-          // We have no segments to process
           break;
         }
 
-        connector.retryWithHandle(
-            new HandleCallback<Void>()
-            {
-              @Override
-              public Void withHandle(Handle handle)
-              {
-                Batch updateBatch = handle.createBatch();
-                String sql = "UPDATE %1$s SET used_status_last_updated = 
'%2$s' WHERE id = '%3$s'";
-                String now = DateTimes.nowUtc().toString();
-                for (String id : segmentsToUpdate) {
-                  updateBatch.add(StringUtils.format(sql, segmentsTable, now, 
id));
-                }
-                updateBatch.execute();
-                return null;
+        numUpdatedRows = connector.retryWithHandle(
+            handle -> {
+              final Batch updateBatch = handle.createBatch();
+              final String sql = "UPDATE %1$s SET used_status_last_updated = 
'%2$s' WHERE id = '%3$s'";
+              String now = DateTimes.nowUtc().toString();
+              for (String id : segmentsToUpdate) {
+                updateBatch.add(StringUtils.format(sql, segmentsTable, now, 
id));
               }
+              int[] results = updateBatch.execute();
+              return Arrays.stream(results).sum();
             }
         );
+        totalUpdatedEntries += numUpdatedRows;
       }
       catch (Exception e) {
         log.warn(e, "Population of used_status_last_updated in [%s] has 
failed. There may be unused segments with"
                     + " NULL values for used_status_last_updated that won't be 
killed!", segmentsTable);

Review Comment:
   ```suggestion
           log.warn(e, "Populating column 'used_status_last_updated' in 
table[%s] has failed. There may be unused segments with"
                       + " NULL values for 'used_status_last_updated' that 
won't be killed!", segmentsTable);
   ```



##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -343,123 +346,122 @@ private DataSegment pollThenStopThenStartIntro() throws 
IOException
     sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
     
Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
     Assert.assertEquals(
-        ImmutableSet.of("wikipedia"),
+        ImmutableSet.of(DS.WIKI),
         sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
     );
-    DataSegment newSegment = createNewSegment1("wikipedia2");
-    publisher.publishSegment(newSegment);
+    final DataSegment koalaSegment = createNewSegment1(DS.KOALA);
+    publisher.publishSegment(koalaSegment);
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
-    return newSegment;
+    return koalaSegment;
   }
 

Review Comment:
   ```suggestion
     /**
      * Create a corrupted segment entry in the segments table to test
      * whether the overall loading of segments from the database continues to 
work
      * even if one of the entries is corrupted.
      */
   ```



##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -343,123 +346,122 @@ private DataSegment pollThenStopThenStartIntro() throws 
IOException
     sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
     
Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
     Assert.assertEquals(
-        ImmutableSet.of("wikipedia"),
+        ImmutableSet.of(DS.WIKI),
         sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
     );
-    DataSegment newSegment = createNewSegment1("wikipedia2");
-    publisher.publishSegment(newSegment);
+    final DataSegment koalaSegment = createNewSegment1(DS.KOALA);
+    publisher.publishSegment(koalaSegment);
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
-    return newSegment;
+    return koalaSegment;
   }
 
   @Test
-  public void testPollWithCorruptedSegment()
+  public void testPollWithCorruptedSegment() throws IOException
   {
+    publishWikiSegments();
+
     //create a corrupted segment entry in segments table, which tests
     //that overall loading of segments from database continues to work
     //even in one of the entries are corrupted.
-    publisher.publishSegment(
-        "corrupt-segment-id",
-        "corrupt-datasource",
-        "corrupt-create-date",
-        "corrupt-start-date",
-        "corrupt-end-date",
-        true,
-        "corrupt-version",
-        true,
-        StringUtils.toUtf8("corrupt-payload"),
-        "corrupt-last-used-date"
-    );
+    final DataSegment corruptSegment = 
DataSegment.builder(wikiSegment1).dataSource("corrupt-datasource").build();
+    publisher.publishSegment(corruptSegment);
+    updateSegmentPayload(corruptSegment, 
StringUtils.toUtf8("corrupt-payload"));
 
     EmittingLogger.registerEmitter(new NoopServiceEmitter());
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
     Assert.assertEquals(
-        "wikipedia",
+        DS.WIKI,
         
Iterables.getOnlyElement(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).getName()
     );
   }
 
   @Test
   public void testGetUnusedSegmentIntervals() throws IOException
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
 
-    // We alter the segment table to allow nullable used_status_last_updated 
in order to test compatibility during druid upgrade from version without 
used_status_last_updated.
-    derbyConnectorRule.allowUsedFlagLastUpdatedToBeNullable();
+    // Allow null values of used_status_last_updated to test upgrade from 
older Druid versions
+    allowUsedFlagLastUpdatedToBeNullable();
 
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
-    int numChangedSegments = 
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource("wikipedia");
+    int numChangedSegments = 
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(DS.WIKI);
     Assert.assertEquals(2, numChangedSegments);
 
-    String newDs = "newDataSource";
-    final DataSegment newSegment = createSegment(
-        newDs,
+    // Publish an unused segment with used_status_last_updated 2 hours ago
+    final DataSegment koalaSegment1 = createSegment(
+        DS.KOALA,
         "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
-    publish(newSegment, false, 
DateTimes.nowUtc().minus(Duration.parse("PT7200S").getMillis()));
+    publishUnusedSegments(koalaSegment1);
+    updateUsedStatusLastUpdated(koalaSegment1, 
DateTimes.nowUtc().minus(Duration.standardHours(2)));
 
-    final DataSegment newSegment2 = createSegment(
-        newDs,
+    // Publish an unused with used_status_last_updated 2 days ago
+    final DataSegment koalaSegment2 = createSegment(
+        DS.KOALA,
         "2017-10-16T00:00:00.000/2017-10-17T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
-    publish(newSegment2, false, 
DateTimes.nowUtc().minus(Duration.parse("PT172800S").getMillis()));
+    publishUnusedSegments(koalaSegment2);
+    updateUsedStatusLastUpdated(koalaSegment2, 
DateTimes.nowUtc().minus(Duration.standardDays(2)));
 
-    final DataSegment newSegment3 = createSegment(
-        newDs,
+    // Publish an unused segment and set used_status_last_updated to null
+    final DataSegment koalaSegment3 = createSegment(
+        DS.KOALA,
         "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
-    publish(newSegment3, false, null);
+    publishUnusedSegments(koalaSegment3);
+    updateUsedStatusLastUpdatedToNull(koalaSegment3);
 
     Assert.assertEquals(
-        ImmutableList.of(segment2.getInterval()),
-        sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
null, DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
+        ImmutableList.of(wikiSegment2.getInterval()),
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null, 
DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
     );
 
     // Test the DateTime maxEndTime argument of getUnusedSegmentIntervals
     Assert.assertEquals(
-        ImmutableList.of(segment2.getInterval()),
-        sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
null, DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
+        ImmutableList.of(wikiSegment2.getInterval()),
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null, 
DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
     );
     Assert.assertEquals(
-        ImmutableList.of(segment1.getInterval()),
-        sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 4, 7, 0, 0), 1, 
DateTimes.COMPARE_DATE_AS_STRING_MAX)
+        ImmutableList.of(wikiSegment1.getInterval()),
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, 
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 4, 7, 0, 0), 1, 
DateTimes.COMPARE_DATE_AS_STRING_MAX)
     );
     Assert.assertEquals(
         ImmutableList.of(),
-        sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 1, 7, 0, 0), 1, 
DateTimes.COMPARE_DATE_AS_STRING_MAX)
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, 
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 1, 7, 0, 0), 1, 
DateTimes.COMPARE_DATE_AS_STRING_MAX)
     );
 
     Assert.assertEquals(
-        ImmutableList.of(segment2.getInterval(), segment1.getInterval()),
-        sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
null, DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX)
+        ImmutableList.of(wikiSegment2.getInterval(), 
wikiSegment1.getInterval()),
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null, 
DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX)
     );
 
     // Test a buffer period that should exclude some segments
 
     // The wikipedia datasource has segments generated with last used time 
equal to roughly the time of test run. None of these segments should be 
selected with a bufer period of 1 day
     Assert.assertEquals(
         ImmutableList.of(),
-        sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5, 
DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, 
DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5, 
DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
     );
 
-    // One of the 3 segments in newDs has a null used_status_last_updated 
which should mean getUnusedSegmentIntervals never returns it
-    // One of the 3 segments in newDs has a used_status_last_updated older 
than 1 day which means it should also be returned
-    // The last of the 3 segemns in newDs has a used_status_last_updated date 
less than one day and should not be returned
+    // koalaSegment3 has a null used_status_last_updated which should mean 
getUnusedSegmentIntervals never returns it
+    // koalaSegment2 has a used_status_last_updated older than 1 day which 
means it should also be returned

Review Comment:
   ```suggestion
       // koalaSegment2 has a used_status_last_updated older than 1 day which 
means it should be returned
   ```



##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -511,346 +506,312 @@ public void testMarkSegmentAsUnused() throws 
IOException, InterruptedException
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment = createSegment(
+        DS.KOALA,
         "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
 
-    publisher.publishSegment(newSegment);
-    awaitDataSourceAppeared(newDataSource);
-    
Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+    publisher.publishSegment(koalaSegment);
+    awaitDataSourceAppeared(DS.KOALA);
+    
Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
 
-    
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(newSegment.getId()));
-    awaitDataSourceDisappeared(newDataSource);
-    
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+    
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(koalaSegment.getId()));
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    awaitDataSourceDisappeared(DS.KOALA);
+    
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
+    System.out.println("Total time:" + stopwatch.millisElapsed());

Review Comment:
   Is the stop watch and print required?



##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -343,123 +346,122 @@ private DataSegment pollThenStopThenStartIntro() throws 
IOException
     sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
     
Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
     Assert.assertEquals(
-        ImmutableSet.of("wikipedia"),
+        ImmutableSet.of(DS.WIKI),
         sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
     );
-    DataSegment newSegment = createNewSegment1("wikipedia2");
-    publisher.publishSegment(newSegment);
+    final DataSegment koalaSegment = createNewSegment1(DS.KOALA);
+    publisher.publishSegment(koalaSegment);
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
-    return newSegment;
+    return koalaSegment;
   }
 
   @Test
-  public void testPollWithCorruptedSegment()
+  public void testPollWithCorruptedSegment() throws IOException
   {
+    publishWikiSegments();
+
     //create a corrupted segment entry in segments table, which tests
     //that overall loading of segments from database continues to work
     //even in one of the entries are corrupted.
-    publisher.publishSegment(
-        "corrupt-segment-id",
-        "corrupt-datasource",
-        "corrupt-create-date",
-        "corrupt-start-date",
-        "corrupt-end-date",
-        true,
-        "corrupt-version",
-        true,
-        StringUtils.toUtf8("corrupt-payload"),
-        "corrupt-last-used-date"
-    );
+    final DataSegment corruptSegment = 
DataSegment.builder(wikiSegment1).dataSource("corrupt-datasource").build();
+    publisher.publishSegment(corruptSegment);
+    updateSegmentPayload(corruptSegment, 
StringUtils.toUtf8("corrupt-payload"));
 
     EmittingLogger.registerEmitter(new NoopServiceEmitter());
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
     Assert.assertEquals(
-        "wikipedia",
+        DS.WIKI,
         
Iterables.getOnlyElement(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).getName()
     );
   }
 
   @Test
   public void testGetUnusedSegmentIntervals() throws IOException
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
 
-    // We alter the segment table to allow nullable used_status_last_updated 
in order to test compatibility during druid upgrade from version without 
used_status_last_updated.
-    derbyConnectorRule.allowUsedFlagLastUpdatedToBeNullable();
+    // Allow null values of used_status_last_updated to test upgrade from 
older Druid versions
+    allowUsedFlagLastUpdatedToBeNullable();
 
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
-    int numChangedSegments = 
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource("wikipedia");
+    int numChangedSegments = 
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(DS.WIKI);
     Assert.assertEquals(2, numChangedSegments);
 
-    String newDs = "newDataSource";
-    final DataSegment newSegment = createSegment(
-        newDs,
+    // Publish an unused segment with used_status_last_updated 2 hours ago
+    final DataSegment koalaSegment1 = createSegment(
+        DS.KOALA,
         "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
-    publish(newSegment, false, 
DateTimes.nowUtc().minus(Duration.parse("PT7200S").getMillis()));
+    publishUnusedSegments(koalaSegment1);
+    updateUsedStatusLastUpdated(koalaSegment1, 
DateTimes.nowUtc().minus(Duration.standardHours(2)));
 
-    final DataSegment newSegment2 = createSegment(
-        newDs,
+    // Publish an unused with used_status_last_updated 2 days ago

Review Comment:
   ```suggestion
       // Publish an unused segment with used_status_last_updated 2 days ago
   ```



##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -511,346 +506,312 @@ public void testMarkSegmentAsUnused() throws 
IOException, InterruptedException
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment = createSegment(
+        DS.KOALA,
         "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
 
-    publisher.publishSegment(newSegment);
-    awaitDataSourceAppeared(newDataSource);
-    
Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+    publisher.publishSegment(koalaSegment);
+    awaitDataSourceAppeared(DS.KOALA);
+    
Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
 
-    
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(newSegment.getId()));
-    awaitDataSourceDisappeared(newDataSource);
-    
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+    
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(koalaSegment.getId()));
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    awaitDataSourceDisappeared(DS.KOALA);
+    
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
+    System.out.println("Total time:" + stopwatch.millisElapsed());
   }
 
-  private void awaitDataSourceAppeared(String newDataSource) throws 
InterruptedException
+  private void awaitDataSourceAppeared(String datasource) throws 
InterruptedException
   {
-    while 
(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource)
 == null) {
-      Thread.sleep(1000);
+    while 
(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(datasource) 
== null) {
+      Thread.sleep(5);
     }
   }
 
   private void awaitDataSourceDisappeared(String dataSource) throws 
InterruptedException
   {
     while 
(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(dataSource) 
!= null) {
-      Thread.sleep(1000);
+      Thread.sleep(5);
     }
   }
 
   @Test
   public void testMarkAsUsedNonOvershadowedSegments() throws Exception
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment1 = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment1 = createSegment(
+        DS.KOALA,
         "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
 
-    final DataSegment newSegment2 = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment2 = createSegment(
+        DS.KOALA,
         "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
-        "2017-10-16T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        1
+        "2017-10-16T20:19:12.565Z"
     );
 
-    // Overshadowed by newSegment2
-    final DataSegment newSegment3 = createSegment(
-        newDataSource,
+    // Overshadowed by koalaSegment2
+    final DataSegment koalaSegment3 = createSegment(
+        DS.KOALA,
         "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        1
+        "2017-10-15T20:19:12.565Z"
     );
 
-    publish(newSegment1, false);
-    publish(newSegment2, false);
-    publish(newSegment3, false);
-    final ImmutableSet<String> segmentIds = ImmutableSet.of(
-        newSegment1.getId().toString(),
-        newSegment2.getId().toString(),
-        newSegment3.getId().toString()
+    publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3);
+    final Set<String> segmentIds = ImmutableSet.of(
+        koalaSegment1.getId().toString(),
+        koalaSegment2.getId().toString(),
+        koalaSegment3.getId().toString()
     );
 
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
-    Assert.assertEquals(2, 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(newDataSource, 
segmentIds));
+    Assert.assertEquals(2, 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(DS.KOALA, 
segmentIds));
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2, newSegment1, newSegment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, 
koalaSegment2),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
   }
 
-  @Test(expected = UnknownSegmentIdsException.class)
+  @Test
   public void testMarkAsUsedNonOvershadowedSegmentsInvalidDataSource() throws 
Exception
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment1 = createNewSegment1(newDataSource);
+    final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+    final DataSegment koalaSegment2 = createNewSegment1(DS.KOALA);
 
-    final DataSegment newSegment2 = createNewSegment1(newDataSource);
-
-    publish(newSegment1, false);
-    publish(newSegment2, false);
+    publishUnusedSegments(koalaSegment1, koalaSegment2);
     final ImmutableSet<String> segmentIds =
-        ImmutableSet.of(newSegment1.getId().toString(), 
newSegment2.getId().toString());
+        ImmutableSet.of(koalaSegment1.getId().toString(), 
koalaSegment2.getId().toString());
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
-    // none of the segments are in data source
-    Assert.assertEquals(0, 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments("wrongDataSource", 
segmentIds));
+
+    Assert.assertThrows(
+        UnknownSegmentIdsException.class,
+        () -> 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments("wrongDataSource", 
segmentIds)
+    );
   }
 
-  @Test(expected = UnknownSegmentIdsException.class)
-  public void testMarkAsUsedNonOvershadowedSegmentsWithInvalidSegmentIds() 
throws UnknownSegmentIdsException
+  @Test
+  public void testMarkAsUsedNonOvershadowedSegmentsWithInvalidSegmentIds()
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment1 = createNewSegment1(newDataSource);
-
-    final DataSegment newSegment2 = createNewSegment1(newDataSource);
+    final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+    final DataSegment koalaSegment2 = createNewSegment1(DS.KOALA);
 
     final ImmutableSet<String> segmentIds =
-        ImmutableSet.of(newSegment1.getId().toString(), 
newSegment2.getId().toString());
+        ImmutableSet.of(koalaSegment1.getId().toString(), 
koalaSegment2.getId().toString());
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
-    // none of the segments are in data source
-    Assert.assertEquals(0, 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(newDataSource, 
segmentIds));
+
+    Assert.assertThrows(

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to