This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f203725dd3 Clean up SqlSegmentsMetadataManager and corresponding 
tests (#16044)
5f203725dd3 is described below

commit 5f203725dd39f240fc4ba5ef8181bb80a972d7c3
Author: Kashif Faraz <kashif.fa...@gmail.com>
AuthorDate: Fri Mar 8 07:34:51 2024 +0530

    Clean up SqlSegmentsMetadataManager and corresponding tests (#16044)
    
    Changes:
    
    Improve `SqlSegmentsMetadataManager`
    - Break the loop in `populateUsedStatusLastUpdated` before going to sleep 
if there are no more segments to update
    - Add comments and clean up logs
    
    Refactor `SqlSegmentsMetadataManagerTest`
    - Merge `SqlSegmentsMetadataManagerEmptyTest` into this test
    - Add method `testPollEmpty`
    - Shave a few seconds off of the tests by reducing poll duration
    - Simplify creation of test segments
    - Some renames here and there
    - Remove unused methods
    - Move `TestDerbyConnector.allowLastUsedFlagToBeNull` to this class
    
    Other minor changes
    - Add javadoc to `NoneShardSpec`
    - Use lambda in `SqlSegmentMetadataPublisher`
---
 .../druid/timeline/partition/NoneShardSpec.java    |  10 +
 .../metadata/SQLMetadataSegmentPublisher.java      |  35 +-
 .../druid/metadata/SqlSegmentsMetadataManager.java |  85 ++-
 .../SqlSegmentsMetadataManagerEmptyTest.java       | 113 ----
 .../metadata/SqlSegmentsMetadataManagerTest.java   | 739 +++++++++++----------
 .../apache/druid/metadata/TestDerbyConnector.java  |  26 -
 6 files changed, 433 insertions(+), 575 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
index 4661b3ab73b..e29d843c98b 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
@@ -29,6 +29,16 @@ import java.util.List;
 import java.util.Map;
 
 /**
+ * {@link ShardSpec} with no partitioning in a time chunk, i.e. a single 
segment
+ * per time chunk. This shard spec has been deprecated and is not generated by
+ * the Druid code anymore. The class has been retained only for backward
+ * compatibility reasons.
+ * <p>
+ * For more information, refer to
+ * <a href="https://github.com/apache/druid/pull/6883";>PR #6883</a>.
+ *
+ * @deprecated Since Druid 0.15.0. Segments generated by Druid 0.15.0 onwards
+ * do not use this shard spec.
  */
 @Deprecated
 public class NoneShardSpec implements ShardSpec
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java
 
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java
index b69f15edb6b..48a92ecba4e 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java
@@ -20,7 +20,6 @@
 package org.apache.druid.metadata;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
@@ -28,8 +27,6 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.skife.jdbi.v2.DBI;
-import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.tweak.HandleCallback;
 
 import java.io.IOException;
 import java.util.List;
@@ -79,8 +76,7 @@ public class SQLMetadataSegmentPublisher implements 
MetadataSegmentPublisher
     );
   }
 
-  @VisibleForTesting
-  void publishSegment(
+  private void publishSegment(
       final String segmentId,
       final String dataSource,
       final String createdDate,
@@ -96,31 +92,18 @@ public class SQLMetadataSegmentPublisher implements 
MetadataSegmentPublisher
     try {
       final DBI dbi = connector.getDBI();
       List<Map<String, Object>> exists = dbi.withHandle(
-          new HandleCallback<List<Map<String, Object>>>()
-          {
-            @Override
-            public List<Map<String, Object>> withHandle(Handle handle)
-            {
-              return handle.createQuery(
-                  StringUtils.format("SELECT id FROM %s WHERE id=:id", 
config.getSegmentsTable())
-              )
-                           .bind("id", segmentId)
-                           .list();
-            }
-          }
+          handle -> handle.createQuery(
+              StringUtils.format("SELECT id FROM %s WHERE id=:id", 
config.getSegmentsTable())
+          ).bind("id", segmentId).list()
       );
 
       if (!exists.isEmpty()) {
-        log.info("Found [%s] in DB, not updating DB", segmentId);
+        log.info("Skipping publish of segment[%s] as it already exists in the 
metadata store.", segmentId);
         return;
       }
 
       dbi.withHandle(
-          new HandleCallback<Void>()
-          {
-            @Override
-            public Void withHandle(Handle handle)
-            {
+          handle ->
               handle.createStatement(statement)
                     .bind("id", segmentId)
                     .bind("dataSource", dataSource)
@@ -132,11 +115,7 @@ public class SQLMetadataSegmentPublisher implements 
MetadataSegmentPublisher
                     .bind("used", used)
                     .bind("payload", payload)
                     .bind("used_status_last_updated", usedFlagLastUpdated)
-                    .execute();
-
-              return null;
-            }
-          }
+                    .execute()
       );
     }
     catch (Exception e) {
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 1bf8ec534a9..0ddf4a2d2cc 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -64,7 +64,6 @@ import org.skife.jdbi.v2.Query;
 import org.skife.jdbi.v2.StatementContext;
 import org.skife.jdbi.v2.TransactionCallback;
 import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.skife.jdbi.v2.tweak.ResultSetMapper;
 
 import javax.annotation.Nullable;
@@ -72,6 +71,7 @@ import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -335,7 +335,7 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
   {
     ExecutorService executorService = Executors.newSingleThreadExecutor();
     usedFlagLastUpdatedPopulationFuture = executorService.submit(
-        () -> populateUsedFlagLastUpdated()
+        this::populateUsedFlagLastUpdated
     );
   }
 
@@ -347,70 +347,68 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
   @VisibleForTesting
   void populateUsedFlagLastUpdated()
   {
-    String segmentsTable = getSegmentsTable();
+    final String segmentsTable = getSegmentsTable();
     log.info(
-        "Populating used_status_last_updated with non-NULL values for unused 
segments in [%s]",
+        "Populating column 'used_status_last_updated' with non-NULL values for 
unused segments in table[%s].",
         segmentsTable
     );
 
-    int limit = 100;
+    final int batchSize = 100;
     int totalUpdatedEntries = 0;
 
+    // Update the rows in batches of size 100
     while (true) {
-      List<String> segmentsToUpdate = new ArrayList<>(100);
+      final List<String> segmentsToUpdate = new ArrayList<>(batchSize);
+      int numUpdatedRows;
       try {
         connector.retryWithHandle(
-            new HandleCallback<Void>()
-            {
-              @Override
-              public Void withHandle(Handle handle)
-              {
-                segmentsToUpdate.addAll(handle.createQuery(
-                    StringUtils.format(
-                        "SELECT id FROM %1$s WHERE used_status_last_updated IS 
NULL and used = :used %2$s",
-                        segmentsTable,
-                        connector.limitClause(limit)
-                    )
-                ).bind("used", false).mapTo(String.class).list());
-                return null;
-              }
+            handle -> {
+              segmentsToUpdate.addAll(handle.createQuery(
+                  StringUtils.format(
+                      "SELECT id FROM %1$s WHERE used_status_last_updated IS 
NULL and used = :used %2$s",
+                      segmentsTable,
+                      connector.limitClause(batchSize)
+                  )
+              ).bind("used", false).mapTo(String.class).list());
+              return null;
             }
         );
 
         if (segmentsToUpdate.isEmpty()) {
-          // We have no segments to process
           break;
         }
 
-        connector.retryWithHandle(
-            new HandleCallback<Void>()
-            {
-              @Override
-              public Void withHandle(Handle handle)
-              {
-                Batch updateBatch = handle.createBatch();
-                String sql = "UPDATE %1$s SET used_status_last_updated = 
'%2$s' WHERE id = '%3$s'";
-                String now = DateTimes.nowUtc().toString();
-                for (String id : segmentsToUpdate) {
-                  updateBatch.add(StringUtils.format(sql, segmentsTable, now, 
id));
-                }
-                updateBatch.execute();
-                return null;
+        numUpdatedRows = connector.retryWithHandle(
+            handle -> {
+              final Batch updateBatch = handle.createBatch();
+              final String sql = "UPDATE %1$s SET used_status_last_updated = 
'%2$s' WHERE id = '%3$s'";
+              String now = DateTimes.nowUtc().toString();
+              for (String id : segmentsToUpdate) {
+                updateBatch.add(StringUtils.format(sql, segmentsTable, now, 
id));
               }
+              int[] results = updateBatch.execute();
+              return Arrays.stream(results).sum();
             }
         );
+        totalUpdatedEntries += numUpdatedRows;
       }
       catch (Exception e) {
-        log.warn(e, "Population of used_status_last_updated in [%s] has 
failed. There may be unused segments with"
-                    + " NULL values for used_status_last_updated that won't be 
killed!", segmentsTable);
+        log.warn(e, "Populating column 'used_status_last_updated' in table[%s] 
has failed. There may be unused segments with"
+                    + " NULL values for 'used_status_last_updated' that won't 
be killed!", segmentsTable);
         return;
       }
 
-      totalUpdatedEntries += segmentsToUpdate.size();
-      log.info("Updated a batch of %d rows in [%s] with a valid 
used_status_last_updated date",
-               segmentsToUpdate.size(),
-               segmentsTable
+      log.debug(
+          "Updated a batch of [%d] rows in table[%s] with a valid 
used_status_last_updated date",
+          segmentsToUpdate.size(), segmentsTable
       );
+
+      // Do not wait if there are no more segments to update
+      if (segmentsToUpdate.size() == numUpdatedRows && numUpdatedRows < 
batchSize) {
+        break;
+      }
+
+      // Wait for some time before processing the next batch
       try {
         Thread.sleep(10000);
       }
@@ -420,9 +418,8 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
       }
     }
     log.info(
-        "Finished updating [%s] with a valid used_status_last_updated date. %d 
rows updated",
-        segmentsTable,
-        totalUpdatedEntries
+        "Populated column 'used_status_last_updated' in table[%s] in [%d] 
rows.",
+        segmentsTable, totalUpdatedEntries
     );
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerEmptyTest.java
 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerEmptyTest.java
deleted file mode 100644
index 440aff3c084..00000000000
--- 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerEmptyTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.metadata;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.druid.client.ImmutableDruidDataSource;
-import org.apache.druid.segment.TestHelper;
-import org.joda.time.Period;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.util.stream.Collectors;
-
-
-/**
- * Like {@link SQLMetadataRuleManagerTest} except with no segments to make 
sure it behaves when it's empty
- */
-public class SqlSegmentsMetadataManagerEmptyTest
-{
-
-  @Rule
-  public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new 
TestDerbyConnector.DerbyConnectorRule();
-
-  private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
-  private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
-
-  @Before
-  public void setUp()
-  {
-    TestDerbyConnector connector = derbyConnectorRule.getConnector();
-    SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig();
-    config.setPollDuration(Period.seconds(1));
-    sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
-        jsonMapper,
-        Suppliers.ofInstance(config),
-        derbyConnectorRule.metadataTablesConfigSupplier(),
-        connector
-    );
-    sqlSegmentsMetadataManager.start();
-
-    connector.createSegmentTable();
-  }
-
-  @After
-  public void teardown()
-  {
-    if (sqlSegmentsMetadataManager.isPollingDatabasePeriodically()) {
-      sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
-    }
-    sqlSegmentsMetadataManager.stop();
-  }
-
-  @Test
-  public void testPollEmpty()
-  {
-    sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
-    sqlSegmentsMetadataManager.poll();
-    
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
-    Assert.assertEquals(
-        ImmutableSet.of(),
-        sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
-    );
-    Assert.assertEquals(
-        ImmutableList.of(),
-        sqlSegmentsMetadataManager
-            .getImmutableDataSourcesWithAllUsedSegments()
-            .stream()
-            .map(ImmutableDruidDataSource::getName)
-            .collect(Collectors.toList())
-    );
-    Assert.assertEquals(
-        null,
-        
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia")
-    );
-    Assert.assertEquals(
-        ImmutableSet.of(),
-        
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
-    );
-  }
-
-  @Test
-  public void testStopAndStart()
-  {
-    // Simulate successive losing and getting the coordinator leadership
-    sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
-    sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
-    sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
-    sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
-  }
-}
diff --git 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
index ab024d568db..85e5021f6c6 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
@@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.server.coordinator.CreateDataSegments;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
@@ -46,101 +47,79 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.tweak.HandleCallback;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 public class SqlSegmentsMetadataManagerTest
 {
+  private static class DS
+  {
+    static final String WIKI = "wikipedia";
+    static final String KOALA = "koala";
+  }
+
   private static DataSegment createSegment(
       String dataSource,
       String interval,
-      String version,
-      String bucketKey,
-      int binaryVersion
+      String version
   )
   {
     return new DataSegment(
         dataSource,
         Intervals.of(interval),
         version,
-        ImmutableMap.of(
-            "type", "s3_zip",
-            "bucket", "test",
-            "key", dataSource + "/" + bucketKey
-        ),
-        ImmutableList.of("dim1", "dim2", "dim3"),
-        ImmutableList.of("count", "value"),
+        ImmutableMap.of(),
+        ImmutableList.of(),
+        ImmutableList.of(),
         NoneShardSpec.instance(),
-        binaryVersion,
+        9,
         1234L
     );
   }
 
   @Rule
-  public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new 
TestDerbyConnector.DerbyConnectorRule();
+  public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule
+      = new TestDerbyConnector.DerbyConnectorRule();
 
   private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
   private SQLMetadataSegmentPublisher publisher;
-  private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
+  private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();
 
-  private final DataSegment segment1 = createSegment(
-      "wikipedia",
-      "2012-03-15T00:00:00.000/2012-03-16T00:00:00.000",
-      "2012-03-16T00:36:30.848Z",
-      "index/y=2012/m=03/d=15/2012-03-16T00:36:30.848Z/0/index.zip",
-      0
-  );
+  private final DataSegment wikiSegment1 =
+      
CreateDataSegments.ofDatasource(DS.WIKI).startingAt("2012-03-15").eachOfSizeInMb(500).get(0);
+  private final DataSegment wikiSegment2 =
+      
CreateDataSegments.ofDatasource(DS.WIKI).startingAt("2012-01-05").eachOfSizeInMb(500).get(0);
 
-  private final DataSegment segment2 = createSegment(
-      "wikipedia",
-      "2012-01-05T00:00:00.000/2012-01-06T00:00:00.000",
-      "2012-01-06T22:19:12.565Z",
-      "wikipedia/index/y=2012/m=01/d=05/2012-01-06T22:19:12.565Z/0/index.zip",
-      0
-  );
-
-  private void publish(DataSegment segment, boolean used) throws IOException
+  private void publishUnusedSegments(DataSegment... segments) throws 
IOException
   {
-    publish(segment, used, DateTimes.nowUtc());
+    for (DataSegment segment : segments) {
+      publisher.publishSegment(segment);
+      sqlSegmentsMetadataManager.markSegmentAsUnused(segment.getId());
+    }
   }
 
-  private void publish(DataSegment segment, boolean used, DateTime 
usedFlagLastUpdated) throws IOException
+  private void publishWikiSegments()
   {
-    boolean partitioned = !(segment.getShardSpec() instanceof NoneShardSpec);
-
-    String usedFlagLastUpdatedStr = null;
-    if (null != usedFlagLastUpdated) {
-      usedFlagLastUpdatedStr = usedFlagLastUpdated.toString();
+    try {
+      publisher.publishSegment(wikiSegment1);
+      publisher.publishSegment(wikiSegment2);
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
     }
-    publisher.publishSegment(
-        segment.getId().toString(),
-        segment.getDataSource(),
-        DateTimes.nowUtc().toString(),
-        segment.getInterval().getStart().toString(),
-        segment.getInterval().getEnd().toString(),
-        partitioned,
-        segment.getVersion(),
-        used,
-        jsonMapper.writeValueAsBytes(segment),
-        usedFlagLastUpdatedStr
-    );
   }
 
   @Before
-  public void setUp() throws Exception
+  public void setUp()
   {
-    TestDerbyConnector connector = derbyConnectorRule.getConnector();
+    final TestDerbyConnector connector = derbyConnectorRule.getConnector();
     SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig();
-    config.setPollDuration(Period.seconds(3));
+    config.setPollDuration(Period.millis(1));
     sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
-        jsonMapper,
+        JSON_MAPPER,
         Suppliers.ofInstance(config),
         derbyConnectorRule.metadataTablesConfigSupplier(),
         connector
@@ -148,15 +127,12 @@ public class SqlSegmentsMetadataManagerTest
     sqlSegmentsMetadataManager.start();
 
     publisher = new SQLMetadataSegmentPublisher(
-        jsonMapper,
+        JSON_MAPPER,
         derbyConnectorRule.metadataTablesConfigSupplier().get(),
         connector
     );
 
     connector.createSegmentTable();
-
-    publisher.publishSegment(segment1);
-    publisher.publishSegment(segment2);
   }
 
   @After
@@ -168,9 +144,32 @@ public class SqlSegmentsMetadataManagerTest
     sqlSegmentsMetadataManager.stop();
   }
 
+  @Test
+  public void testPollEmpty()
+  {
+    sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
+    sqlSegmentsMetadataManager.poll();
+    
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+    Assert.assertTrue(
+        sqlSegmentsMetadataManager.retrieveAllDataSourceNames().isEmpty()
+    );
+    Assert.assertEquals(
+        0,
+        sqlSegmentsMetadataManager
+            .getImmutableDataSourcesWithAllUsedSegments()
+            .stream()
+            .map(ImmutableDruidDataSource::getName).count()
+    );
+    
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.WIKI));
+    Assert.assertTrue(
+        
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()).isEmpty()
+    );
+  }
+
   @Test
   public void testPollPeriodically()
   {
+    publishWikiSegments();
     DataSourcesSnapshot dataSourcesSnapshot = 
sqlSegmentsMetadataManager.getDataSourcesSnapshot();
     Assert.assertNull(dataSourcesSnapshot);
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
@@ -180,22 +179,22 @@ public class SqlSegmentsMetadataManagerTest
     Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() 
instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
     dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
     Assert.assertEquals(
-        ImmutableSet.of("wikipedia"),
+        ImmutableSet.of(DS.WIKI),
         sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
     );
     Assert.assertEquals(
-        ImmutableList.of("wikipedia"),
+        ImmutableList.of(DS.WIKI),
         dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
                            .stream()
                            .map(ImmutableDruidDataSource::getName)
                            .collect(Collectors.toList())
     );
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2),
-        
ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments())
+        ImmutableSet.of(wikiSegment1, wikiSegment2),
+        
ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource(DS.WIKI).getSegments())
     );
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2),
         
ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot())
     );
   }
@@ -203,6 +202,7 @@ public class SqlSegmentsMetadataManagerTest
   @Test
   public void testPollOnDemand()
   {
+    publishWikiSegments();
     DataSourcesSnapshot dataSourcesSnapshot = 
sqlSegmentsMetadataManager.getDataSourcesSnapshot();
     Assert.assertNull(dataSourcesSnapshot);
     // This should return false and not wait/poll anything as we did not 
schedule periodic poll
@@ -214,22 +214,22 @@ public class SqlSegmentsMetadataManagerTest
     Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() 
instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll);
     dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
     Assert.assertEquals(
-        ImmutableSet.of("wikipedia"),
+        ImmutableSet.of(DS.WIKI),
         sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
     );
     Assert.assertEquals(
-        ImmutableList.of("wikipedia"),
+        ImmutableList.of(DS.WIKI),
         dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
                            .stream()
                            .map(ImmutableDruidDataSource::getName)
                            .collect(Collectors.toList())
     );
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2),
-        
ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments())
+        ImmutableSet.of(wikiSegment1, wikiSegment2),
+        
ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource(DS.WIKI).getSegments())
     );
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2),
         
ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot())
     );
   }
@@ -237,6 +237,7 @@ public class SqlSegmentsMetadataManagerTest
   @Test(timeout = 60_000)
   public void testPollPeriodicallyAndOnDemandInterleave() throws Exception
   {
+    publishWikiSegments();
     DataSourcesSnapshot dataSourcesSnapshot = 
sqlSegmentsMetadataManager.getDataSourcesSnapshot();
     Assert.assertNull(dataSourcesSnapshot);
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
@@ -246,15 +247,13 @@ public class SqlSegmentsMetadataManagerTest
     Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() 
instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
     dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
     Assert.assertEquals(
-        ImmutableList.of("wikipedia"),
+        ImmutableList.of(DS.WIKI),
         dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
                            .stream()
                            .map(ImmutableDruidDataSource::getName)
                            .collect(Collectors.toList())
     );
-    final String newDataSource2 = "wikipedia2";
-    final DataSegment newSegment2 = createNewSegment1(newDataSource2);
-    publisher.publishSegment(newSegment2);
+    publisher.publishSegment(createNewSegment1(DS.KOALA));
 
     // This call will force on demand poll
     sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll();
@@ -263,7 +262,7 @@ public class SqlSegmentsMetadataManagerTest
     // New datasource should now be in the snapshot since we just force on 
demand poll.
     dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
     Assert.assertEquals(
-        ImmutableList.of("wikipedia2", "wikipedia"),
+        ImmutableList.of(DS.KOALA, DS.WIKI),
         dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
                            .stream()
                            .map(ImmutableDruidDataSource::getName)
@@ -271,8 +270,7 @@ public class SqlSegmentsMetadataManagerTest
     );
 
     final String newDataSource3 = "wikipedia3";
-    final DataSegment newSegment3 = createNewSegment1(newDataSource3);
-    publisher.publishSegment(newSegment3);
+    publisher.publishSegment(createNewSegment1(newDataSource3));
 
     // This time wait for periodic poll (not doing on demand poll so we have 
to wait a bit...)
     while 
(sqlSegmentsMetadataManager.getDataSourcesSnapshot().getDataSource(newDataSource3)
 == null) {
@@ -282,7 +280,7 @@ public class SqlSegmentsMetadataManagerTest
     Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() 
instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
     dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
     Assert.assertEquals(
-        ImmutableSet.of("wikipedia2", "wikipedia3", "wikipedia"),
+        ImmutableSet.of(DS.KOALA, "wikipedia3", DS.WIKI),
         dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
                            .stream()
                            .map(ImmutableDruidDataSource::getName)
@@ -293,29 +291,32 @@ public class SqlSegmentsMetadataManagerTest
   @Test
   public void 
testPrepareImmutableDataSourceWithUsedSegmentsAwaitsPollOnRestart() throws 
IOException
   {
-    DataSegment newSegment = pollThenStopThenStartIntro();
+    publishWikiSegments();
+    DataSegment koalaSegment = pollThenStopThenPublishKoalaSegment();
     Assert.assertEquals(
-        ImmutableSet.of(newSegment),
-        
ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia2").getSegments())
+        ImmutableSet.of(koalaSegment),
+        
ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA).getSegments())
     );
   }
 
   @Test
   public void testGetDataSourceWithUsedSegmentsAwaitsPollOnRestart() throws 
IOException
   {
-    DataSegment newSegment = pollThenStopThenStartIntro();
+    publishWikiSegments();
+    DataSegment koalaSegment = pollThenStopThenPublishKoalaSegment();
     Assert.assertEquals(
-        ImmutableSet.of(newSegment),
-        
ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia2").getSegments())
+        ImmutableSet.of(koalaSegment),
+        
ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA).getSegments())
     );
   }
 
   @Test
   public void 
testPrepareImmutableDataSourcesWithAllUsedSegmentsAwaitsPollOnRestart() throws 
IOException
   {
-    DataSegment newSegment = pollThenStopThenStartIntro();
+    publishWikiSegments();
+    DataSegment koalaSegment = pollThenStopThenPublishKoalaSegment();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2, newSegment),
+        ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment),
         ImmutableSet.copyOf(
             sqlSegmentsMetadataManager
                 .getImmutableDataSourcesWithAllUsedSegments()
@@ -329,54 +330,49 @@ public class SqlSegmentsMetadataManagerTest
   @Test
   public void testIterateAllUsedSegmentsAwaitsPollOnRestart() throws 
IOException
   {
-    DataSegment newSegment = pollThenStopThenStartIntro();
+    publishWikiSegments();
+    DataSegment koalaSegment = pollThenStopThenPublishKoalaSegment();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2, newSegment),
+        ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
   }
 
-  private DataSegment pollThenStopThenStartIntro() throws IOException
+  private DataSegment pollThenStopThenPublishKoalaSegment() throws IOException
   {
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
     sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
     
Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
     Assert.assertEquals(
-        ImmutableSet.of("wikipedia"),
+        ImmutableSet.of(DS.WIKI),
         sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
     );
-    DataSegment newSegment = createNewSegment1("wikipedia2");
-    publisher.publishSegment(newSegment);
+    final DataSegment koalaSegment = createNewSegment1(DS.KOALA);
+    publisher.publishSegment(koalaSegment);
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
-    return newSegment;
+    return koalaSegment;
   }
-
+  /**
+   * Create a corrupted segment entry in the segments table to test
+   * whether the overall loading of segments from the database continues to 
work
+   * even if one of the entries is corrupted.
+   */
   @Test
-  public void testPollWithCorruptedSegment()
-  {
-    //create a corrupted segment entry in segments table, which tests
-    //that overall loading of segments from database continues to work
-    //even in one of the entries are corrupted.
-    publisher.publishSegment(
-        "corrupt-segment-id",
-        "corrupt-datasource",
-        "corrupt-create-date",
-        "corrupt-start-date",
-        "corrupt-end-date",
-        true,
-        "corrupt-version",
-        true,
-        StringUtils.toUtf8("corrupt-payload"),
-        "corrupt-last-used-date"
-    );
+  public void testPollWithCorruptedSegment() throws IOException
+  {
+    publishWikiSegments();
+
+    final DataSegment corruptSegment = 
DataSegment.builder(wikiSegment1).dataSource("corrupt-datasource").build();
+    publisher.publishSegment(corruptSegment);
+    updateSegmentPayload(corruptSegment, 
StringUtils.toUtf8("corrupt-payload"));
 
     EmittingLogger.registerEmitter(new NoopServiceEmitter());
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
     Assert.assertEquals(
-        "wikipedia",
+        DS.WIKI,
         
Iterables.getOnlyElement(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).getName()
     );
   }
@@ -384,66 +380,66 @@ public class SqlSegmentsMetadataManagerTest
   @Test
   public void testGetUnusedSegmentIntervals() throws IOException
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
 
-    // We alter the segment table to allow nullable used_status_last_updated 
in order to test compatibility during druid upgrade from version without 
used_status_last_updated.
-    derbyConnectorRule.allowUsedFlagLastUpdatedToBeNullable();
+    // Allow null values of used_status_last_updated to test upgrade from 
older Druid versions
+    allowUsedFlagLastUpdatedToBeNullable();
 
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
-    int numChangedSegments = 
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource("wikipedia");
+    int numChangedSegments = 
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(DS.WIKI);
     Assert.assertEquals(2, numChangedSegments);
 
-    String newDs = "newDataSource";
-    final DataSegment newSegment = createSegment(
-        newDs,
+    // Publish an unused segment with used_status_last_updated 2 hours ago
+    final DataSegment koalaSegment1 = createSegment(
+        DS.KOALA,
         "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
-    publish(newSegment, false, 
DateTimes.nowUtc().minus(Duration.parse("PT7200S").getMillis()));
+    publishUnusedSegments(koalaSegment1);
+    updateUsedStatusLastUpdated(koalaSegment1, 
DateTimes.nowUtc().minus(Duration.standardHours(2)));
 
-    final DataSegment newSegment2 = createSegment(
-        newDs,
+    // Publish an unused segment with used_status_last_updated 2 days ago
+    final DataSegment koalaSegment2 = createSegment(
+        DS.KOALA,
         "2017-10-16T00:00:00.000/2017-10-17T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
-    publish(newSegment2, false, 
DateTimes.nowUtc().minus(Duration.parse("PT172800S").getMillis()));
+    publishUnusedSegments(koalaSegment2);
+    updateUsedStatusLastUpdated(koalaSegment2, 
DateTimes.nowUtc().minus(Duration.standardDays(2)));
 
-    final DataSegment newSegment3 = createSegment(
-        newDs,
+    // Publish an unused segment and set used_status_last_updated to null
+    final DataSegment koalaSegment3 = createSegment(
+        DS.KOALA,
         "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
-    publish(newSegment3, false, null);
+    publishUnusedSegments(koalaSegment3);
+    updateUsedStatusLastUpdatedToNull(koalaSegment3);
 
     Assert.assertEquals(
-        ImmutableList.of(segment2.getInterval()),
-        sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
null, DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
+        ImmutableList.of(wikiSegment2.getInterval()),
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null, 
DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
     );
 
     // Test the DateTime maxEndTime argument of getUnusedSegmentIntervals
     Assert.assertEquals(
-        ImmutableList.of(segment2.getInterval()),
-        sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
null, DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
+        ImmutableList.of(wikiSegment2.getInterval()),
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null, 
DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
     );
     Assert.assertEquals(
-        ImmutableList.of(segment1.getInterval()),
-        sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 4, 7, 0, 0), 1, 
DateTimes.COMPARE_DATE_AS_STRING_MAX)
+        ImmutableList.of(wikiSegment1.getInterval()),
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, 
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 4, 7, 0, 0), 1, 
DateTimes.COMPARE_DATE_AS_STRING_MAX)
     );
     Assert.assertEquals(
         ImmutableList.of(),
-        sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 1, 7, 0, 0), 1, 
DateTimes.COMPARE_DATE_AS_STRING_MAX)
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, 
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 1, 7, 0, 0), 1, 
DateTimes.COMPARE_DATE_AS_STRING_MAX)
     );
 
     Assert.assertEquals(
-        ImmutableList.of(segment2.getInterval(), segment1.getInterval()),
-        sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
null, DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX)
+        ImmutableList.of(wikiSegment2.getInterval(), 
wikiSegment1.getInterval()),
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null, 
DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX)
     );
 
     // Test a buffer period that should exclude some segments
@@ -451,15 +447,21 @@ public class SqlSegmentsMetadataManagerTest
     // The wikipedia datasource has segments generated with last used time 
equal to roughly the time of test run. None of these segments should be 
selected with a bufer period of 1 day
     Assert.assertEquals(
         ImmutableList.of(),
-        sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5, 
DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, 
DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5, 
DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
     );
 
-    // One of the 3 segments in newDs has a null used_status_last_updated 
which should mean getUnusedSegmentIntervals never returns it
-    // One of the 3 segments in newDs has a used_status_last_updated older 
than 1 day which means it should also be returned
-    // The last of the 3 segemns in newDs has a used_status_last_updated date 
less than one day and should not be returned
+    // koalaSegment3 has a null used_status_last_updated which should mean 
getUnusedSegmentIntervals never returns it
+    // koalaSegment2 has a used_status_last_updated older than 1 day which 
means it should be returned
+    // The last of the 3 segments in koala has a used_status_last_updated date 
less than one day and should not be returned
     Assert.assertEquals(
-        ImmutableList.of(newSegment2.getInterval()),
-        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(newDs, 
DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5, 
DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
+        ImmutableList.of(koalaSegment2.getInterval()),
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals(
+            DS.KOALA,
+            DateTimes.COMPARE_DATE_AS_STRING_MIN,
+            DateTimes.of("3000"),
+            5,
+            DateTimes.nowUtc().minus(Duration.parse("PT86400S"))
+        )
     );
   }
 
@@ -470,37 +472,30 @@ public class SqlSegmentsMetadataManagerTest
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment = createNewSegment1(newDataSource);
-
-    publisher.publishSegment(newSegment);
+    publisher.publishSegment(createNewSegment1(DS.KOALA));
 
-    awaitDataSourceAppeared(newDataSource);
-    int numChangedSegments = 
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(newDataSource);
+    awaitDataSourceAppeared(DS.KOALA);
+    int numChangedSegments = 
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(DS.KOALA);
     Assert.assertEquals(1, numChangedSegments);
-    awaitDataSourceDisappeared(newDataSource);
-    
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+    awaitDataSourceDisappeared(DS.KOALA);
+    
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
   }
 
-  private static DataSegment createNewSegment1(String newDataSource)
+  private static DataSegment createNewSegment1(String datasource)
   {
     return createSegment(
-        newDataSource,
+        datasource,
         "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
   }
 
-  private static DataSegment createNewSegment2(String newDataSource)
+  private static DataSegment createNewSegment2(String datasource)
   {
     return createSegment(
-        newDataSource,
+        datasource,
         "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
   }
 
@@ -511,188 +506,171 @@ public class SqlSegmentsMetadataManagerTest
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment = createSegment(
+        DS.KOALA,
         "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
 
-    publisher.publishSegment(newSegment);
-    awaitDataSourceAppeared(newDataSource);
-    
Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+    publisher.publishSegment(koalaSegment);
+    awaitDataSourceAppeared(DS.KOALA);
+    
Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
 
-    
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(newSegment.getId()));
-    awaitDataSourceDisappeared(newDataSource);
-    
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+    
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(koalaSegment.getId()));
+    awaitDataSourceDisappeared(DS.KOALA);
+    
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
   }
 
-  private void awaitDataSourceAppeared(String newDataSource) throws 
InterruptedException
+  private void awaitDataSourceAppeared(String datasource) throws 
InterruptedException
   {
-    while 
(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource)
 == null) {
-      Thread.sleep(1000);
+    while 
(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(datasource) 
== null) {
+      Thread.sleep(5);
     }
   }
 
   private void awaitDataSourceDisappeared(String dataSource) throws 
InterruptedException
   {
     while 
(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(dataSource) 
!= null) {
-      Thread.sleep(1000);
+      Thread.sleep(5);
     }
   }
 
   @Test
   public void testMarkAsUsedNonOvershadowedSegments() throws Exception
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment1 = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment1 = createSegment(
+        DS.KOALA,
         "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
 
-    final DataSegment newSegment2 = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment2 = createSegment(
+        DS.KOALA,
         "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
-        "2017-10-16T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        1
+        "2017-10-16T20:19:12.565Z"
     );
 
-    // Overshadowed by newSegment2
-    final DataSegment newSegment3 = createSegment(
-        newDataSource,
+    // Overshadowed by koalaSegment2
+    final DataSegment koalaSegment3 = createSegment(
+        DS.KOALA,
         "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        1
+        "2017-10-15T20:19:12.565Z"
     );
 
-    publish(newSegment1, false);
-    publish(newSegment2, false);
-    publish(newSegment3, false);
-    final ImmutableSet<String> segmentIds = ImmutableSet.of(
-        newSegment1.getId().toString(),
-        newSegment2.getId().toString(),
-        newSegment3.getId().toString()
+    publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3);
+    final Set<String> segmentIds = ImmutableSet.of(
+        koalaSegment1.getId().toString(),
+        koalaSegment2.getId().toString(),
+        koalaSegment3.getId().toString()
     );
 
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
-    Assert.assertEquals(2, 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(newDataSource, 
segmentIds));
+    Assert.assertEquals(2, 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(DS.KOALA, 
segmentIds));
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2, newSegment1, newSegment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, 
koalaSegment2),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
   }
 
-  @Test(expected = UnknownSegmentIdsException.class)
+  @Test
   public void testMarkAsUsedNonOvershadowedSegmentsInvalidDataSource() throws 
Exception
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment1 = createNewSegment1(newDataSource);
-
-    final DataSegment newSegment2 = createNewSegment1(newDataSource);
+    final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+    final DataSegment koalaSegment2 = createNewSegment1(DS.KOALA);
 
-    publish(newSegment1, false);
-    publish(newSegment2, false);
+    publishUnusedSegments(koalaSegment1, koalaSegment2);
     final ImmutableSet<String> segmentIds =
-        ImmutableSet.of(newSegment1.getId().toString(), 
newSegment2.getId().toString());
+        ImmutableSet.of(koalaSegment1.getId().toString(), 
koalaSegment2.getId().toString());
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
-    // none of the segments are in data source
-    Assert.assertEquals(0, 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments("wrongDataSource", 
segmentIds));
+
+    Assert.assertThrows(
+        UnknownSegmentIdsException.class,
+        () -> 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments("wrongDataSource", 
segmentIds)
+    );
   }
 
-  @Test(expected = UnknownSegmentIdsException.class)
-  public void testMarkAsUsedNonOvershadowedSegmentsWithInvalidSegmentIds() 
throws UnknownSegmentIdsException
+  @Test
+  public void testMarkAsUsedNonOvershadowedSegmentsWithInvalidSegmentIds()
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment1 = createNewSegment1(newDataSource);
-
-    final DataSegment newSegment2 = createNewSegment1(newDataSource);
+    final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+    final DataSegment koalaSegment2 = createNewSegment1(DS.KOALA);
 
     final ImmutableSet<String> segmentIds =
-        ImmutableSet.of(newSegment1.getId().toString(), 
newSegment2.getId().toString());
+        ImmutableSet.of(koalaSegment1.getId().toString(), 
koalaSegment2.getId().toString());
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
-    // none of the segments are in data source
-    Assert.assertEquals(0, 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(newDataSource, 
segmentIds));
+
+    Assert.assertThrows(
+        UnknownSegmentIdsException.class,
+        () -> 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(DS.KOALA, 
segmentIds)
+    );
   }
 
   @Test
   public void testMarkAsUsedNonOvershadowedSegmentsInInterval() throws 
IOException
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment1 = createNewSegment1(newDataSource);
-
-    final DataSegment newSegment2 = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+    final DataSegment koalaSegment2 = createSegment(
+        DS.KOALA,
         "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
-        "2017-10-16T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        1
+        "2017-10-16T20:19:12.565Z"
     );
-
-    final DataSegment newSegment3 = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment3 = createSegment(
+        DS.KOALA,
         "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
 
-    // Overshadowed by newSegment2
-    final DataSegment newSegment4 = createNewSegment2(newDataSource);
+    // Overshadowed by koalaSegment2
+    final DataSegment koalaSegment4 = createNewSegment2(DS.KOALA);
 
-    publish(newSegment1, false);
-    publish(newSegment2, false);
-    publish(newSegment3, false);
-    publish(newSegment4, false);
+    publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3, 
koalaSegment4);
     final Interval theInterval = 
Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000");
 
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
 
     // 2 out of 3 segments match the interval
-    Assert.assertEquals(2, 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(newDataSource,
 theInterval));
+    Assert.assertEquals(2, 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA,
 theInterval));
 
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2, newSegment1, newSegment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, 
koalaSegment2),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
   }
@@ -700,56 +678,47 @@ public class SqlSegmentsMetadataManagerTest
   @Test
   public void 
testMarkAsUsedNonOvershadowedSegmentsInIntervalWithOverlappingInterval() throws 
IOException
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment1 = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment1 = createSegment(
+        DS.KOALA,
         "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
 
-    final DataSegment newSegment2 = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment2 = createSegment(
+        DS.KOALA,
         "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
-        "2017-10-16T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        1
+        "2017-10-16T20:19:12.565Z"
     );
 
-    final DataSegment newSegment3 = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment3 = createSegment(
+        DS.KOALA,
         "2017-10-19T00:00:00.000/2017-10-22T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
 
-    // Overshadowed by newSegment2
-    final DataSegment newSegment4 = createNewSegment2(newDataSource);
+    // Overshadowed by koalaSegment2
+    final DataSegment koalaSegment4 = createNewSegment2(DS.KOALA);
 
-    publish(newSegment1, false);
-    publish(newSegment2, false);
-    publish(newSegment3, false);
-    publish(newSegment4, false);
+    publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3, 
koalaSegment4);
     final Interval theInterval = 
Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000");
 
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
 
     // 1 out of 3 segments match the interval, other 2 overlap, only the 
segment fully contained will be marked unused
-    Assert.assertEquals(1, 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(newDataSource,
 theInterval));
+    Assert.assertEquals(1, 
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA,
 theInterval));
 
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2, newSegment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment2),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
   }
@@ -757,24 +726,24 @@ public class SqlSegmentsMetadataManagerTest
   @Test
   public void testMarkSegmentsAsUnused() throws IOException
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment1 = createNewSegment1(newDataSource);
+    final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+    final DataSegment koalaSegment2 = createNewSegment1(DS.KOALA);
 
-    final DataSegment newSegment2 = createNewSegment1(newDataSource);
+    publisher.publishSegment(koalaSegment1);
+    publisher.publishSegment(koalaSegment2);
 
-    publisher.publishSegment(newSegment1);
-    publisher.publishSegment(newSegment2);
     final ImmutableSet<SegmentId> segmentIds =
-        ImmutableSet.of(newSegment1.getId(), newSegment1.getId());
+        ImmutableSet.of(koalaSegment1.getId(), koalaSegment1.getId());
 
     Assert.assertEquals(segmentIds.size(), 
sqlSegmentsMetadataManager.markSegmentsAsUnused(segmentIds));
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.of(wikiSegment1, wikiSegment2),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
   }
@@ -782,34 +751,30 @@ public class SqlSegmentsMetadataManagerTest
   @Test
   public void testMarkAsUnusedSegmentsInInterval() throws IOException
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment1 = createNewSegment1(newDataSource);
-
-    final DataSegment newSegment2 = createNewSegment2(newDataSource);
-
-    final DataSegment newSegment3 = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+    final DataSegment koalaSegment2 = createNewSegment2(DS.KOALA);
+    final DataSegment koalaSegment3 = createSegment(
+        DS.KOALA,
         "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
 
-    publisher.publishSegment(newSegment1);
-    publisher.publishSegment(newSegment2);
-    publisher.publishSegment(newSegment3);
+    publisher.publishSegment(koalaSegment1);
+    publisher.publishSegment(koalaSegment2);
+    publisher.publishSegment(koalaSegment3);
     final Interval theInterval = 
Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000");
 
     // 2 out of 3 segments match the interval
-    Assert.assertEquals(2, 
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(newDataSource, 
theInterval));
+    Assert.assertEquals(2, 
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, 
theInterval));
 
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2, newSegment3),
+        ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment3),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
   }
@@ -817,40 +782,34 @@ public class SqlSegmentsMetadataManagerTest
   @Test
   public void testMarkAsUnusedSegmentsInIntervalWithOverlappingInterval() 
throws IOException
   {
+    publishWikiSegments();
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
 
-    final String newDataSource = "wikipedia2";
-    final DataSegment newSegment1 = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment1 = createSegment(
+        DS.KOALA,
         "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
-
-    final DataSegment newSegment2 = createNewSegment2(newDataSource);
-
-    final DataSegment newSegment3 = createSegment(
-        newDataSource,
+    final DataSegment koalaSegment2 = createNewSegment2(DS.KOALA);
+    final DataSegment koalaSegment3 = createSegment(
+        DS.KOALA,
         "2017-10-19T00:00:00.000/2017-10-22T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
 
-    publisher.publishSegment(newSegment1);
-    publisher.publishSegment(newSegment2);
-    publisher.publishSegment(newSegment3);
+    publisher.publishSegment(koalaSegment1);
+    publisher.publishSegment(koalaSegment2);
+    publisher.publishSegment(koalaSegment3);
     final Interval theInterval = 
Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000");
 
     // 1 out of 3 segments match the interval, other 2 overlap, only the 
segment fully contained will be marked unused
-    Assert.assertEquals(1, 
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(newDataSource, 
theInterval));
+    Assert.assertEquals(1, 
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, 
theInterval));
 
     sqlSegmentsMetadataManager.poll();
     Assert.assertEquals(
-        ImmutableSet.of(segment1, segment2, newSegment1, newSegment3),
+        ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, 
koalaSegment3),
         
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
     );
   }
@@ -868,81 +827,133 @@ public class SqlSegmentsMetadataManagerTest
   @Test
   public void testIterateAllUsedNonOvershadowedSegmentsForDatasourceInterval() 
throws Exception
   {
+    publishWikiSegments();
     final Interval theInterval = 
Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000");
-    Optional<Iterable<DataSegment>> segments = 
sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
-        "wikipedia", theInterval, true
+
+    // Re-create SqlSegmentsMetadataManager with a higher poll duration
+    final SegmentsMetadataManagerConfig config = new 
SegmentsMetadataManagerConfig();
+    config.setPollDuration(Period.seconds(1));
+    sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
+        JSON_MAPPER,
+        Suppliers.ofInstance(config),
+        derbyConnectorRule.metadataTablesConfigSupplier(),
+        derbyConnectorRule.getConnector()
     );
+    sqlSegmentsMetadataManager.start();
+
+    Optional<Iterable<DataSegment>> segments = sqlSegmentsMetadataManager
+        .iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DS.WIKI, 
theInterval, true);
     Assert.assertTrue(segments.isPresent());
     Set<DataSegment> dataSegmentSet = ImmutableSet.copyOf(segments.get());
     Assert.assertEquals(1, dataSegmentSet.size());
-    Assert.assertTrue(dataSegmentSet.contains(segment1));
+    Assert.assertTrue(dataSegmentSet.contains(wikiSegment1));
 
-    final DataSegment newSegment2 = createSegment(
-        "wikipedia",
+    final DataSegment wikiSegment3 = createSegment(
+        DS.WIKI,
         "2012-03-16T00:00:00.000/2012-03-17T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
-    publisher.publishSegment(newSegment2);
+    publisher.publishSegment(wikiSegment3);
 
     // New segment is not returned since we call without force poll
-    segments = 
sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
-        "wikipedia", theInterval, false
-    );
+    segments = sqlSegmentsMetadataManager
+        .iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DS.WIKI, 
theInterval, false);
     Assert.assertTrue(segments.isPresent());
     dataSegmentSet = ImmutableSet.copyOf(segments.get());
     Assert.assertEquals(1, dataSegmentSet.size());
-    Assert.assertTrue(dataSegmentSet.contains(segment1));
+    Assert.assertTrue(dataSegmentSet.contains(wikiSegment1));
 
     // New segment is returned since we call with force poll
-    segments = 
sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
-        "wikipedia", theInterval, true
-    );
+    segments = sqlSegmentsMetadataManager
+        .iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DS.WIKI, 
theInterval, true);
     Assert.assertTrue(segments.isPresent());
     dataSegmentSet = ImmutableSet.copyOf(segments.get());
     Assert.assertEquals(2, dataSegmentSet.size());
-    Assert.assertTrue(dataSegmentSet.contains(segment1));
-    Assert.assertTrue(dataSegmentSet.contains(newSegment2));
+    Assert.assertTrue(dataSegmentSet.contains(wikiSegment1));
+    Assert.assertTrue(dataSegmentSet.contains(wikiSegment3));
   }
 
   @Test
   public void testPopulateUsedFlagLastUpdated() throws IOException
   {
-    derbyConnectorRule.allowUsedFlagLastUpdatedToBeNullable();
-    final DataSegment newSegment = createSegment(
-        "dummyDS",
+    allowUsedFlagLastUpdatedToBeNullable();
+    final DataSegment koalaSegment = createSegment(
+        DS.KOALA,
         "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
-        "2017-10-15T20:19:12.565Z",
-        
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
-        0
+        "2017-10-15T20:19:12.565Z"
     );
-    publish(newSegment, false, null);
-    Assert.assertTrue(getCountOfRowsWithLastUsedNull() > 0);
+
+    publishUnusedSegments(koalaSegment);
+    updateUsedStatusLastUpdatedToNull(koalaSegment);
+
+    Assert.assertEquals(1, getCountOfRowsWithLastUsedNull());
     sqlSegmentsMetadataManager.populateUsedFlagLastUpdated();
-    Assert.assertTrue(getCountOfRowsWithLastUsedNull() == 0);
+    Assert.assertEquals(0, getCountOfRowsWithLastUsedNull());
+  }
+
+  private void updateSegmentPayload(DataSegment segment, byte[] payload)
+  {
+    executeUpdate(
+        "UPDATE %1$s SET PAYLOAD = ? WHERE ID = ?",
+        payload,
+        segment.getId().toString()
+    );
   }
 
   private int getCountOfRowsWithLastUsedNull()
   {
     return derbyConnectorRule.getConnector().retryWithHandle(
-        new HandleCallback<Integer>()
-        {
-          @Override
-          public Integer withHandle(Handle handle)
-          {
-            List<Map<String, Object>> lst = handle.select(
-                StringUtils.format(
-                    "SELECT * FROM %1$s WHERE USED_STATUS_LAST_UPDATED IS 
NULL",
-                    derbyConnectorRule.metadataTablesConfigSupplier()
-                                      .get()
-                                      .getSegmentsTable()
-                                      .toUpperCase(Locale.ENGLISH)
-                )
-            );
-            return lst.size();
-          }
-        }
+        handle -> handle.select(
+            StringUtils.format(
+                "SELECT ID FROM %1$s WHERE USED_STATUS_LAST_UPDATED IS NULL",
+                getSegmentsTable()
+            )
+        ).size()
     );
   }
+
+  private void updateUsedStatusLastUpdated(DataSegment segment, DateTime 
newValue)
+  {
+    executeUpdate(
+        "UPDATE %1$s SET USED_STATUS_LAST_UPDATED = ? WHERE ID = ?",
+        newValue.toString(),
+        segment.getId().toString()
+    );
+  }
+
+  private void updateUsedStatusLastUpdatedToNull(DataSegment segment)
+  {
+    executeUpdate(
+        "UPDATE %1$s SET USED_STATUS_LAST_UPDATED = NULL WHERE ID = ?",
+        segment.getId().toString()
+    );
+  }
+
+  private void executeUpdate(String sqlFormat, Object... args)
+  {
+    derbyConnectorRule.getConnector().retryWithHandle(
+        handle -> handle.update(
+            StringUtils.format(sqlFormat, getSegmentsTable()),
+            args
+        )
+    );
+  }
+
+  /**
+   * Alters the column used_status_last_updated to be nullable. This is used to
+   * test backward compatibility with versions of Druid without this column
+   * present in the segments table.
+   */
+  private void allowUsedFlagLastUpdatedToBeNullable()
+  {
+    executeUpdate("ALTER TABLE %1$s ALTER COLUMN USED_STATUS_LAST_UPDATED 
NULL");
+  }
+
+  private String getSegmentsTable()
+  {
+    return derbyConnectorRule.metadataTablesConfigSupplier()
+                             .get()
+                             .getSegmentsTable()
+                             .toUpperCase(Locale.ENGLISH);
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java 
b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java
index d0d8357837c..e5460ce402b 100644
--- a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java
+++ b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java
@@ -25,14 +25,10 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.metadata.storage.derby.DerbyConnector;
 import org.junit.Assert;
 import org.junit.rules.ExternalResource;
-import org.skife.jdbi.v2.Batch;
 import org.skife.jdbi.v2.DBI;
-import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
-import org.skife.jdbi.v2.tweak.HandleCallback;
 
 import java.sql.SQLException;
-import java.util.Locale;
 import java.util.UUID;
 
 public class TestDerbyConnector extends DerbyConnector
@@ -139,27 +135,5 @@ public class TestDerbyConnector extends DerbyConnector
     {
       return dbTables;
     }
-
-    public void allowUsedFlagLastUpdatedToBeNullable()
-    {
-      connector.retryWithHandle(
-          new HandleCallback<Void>()
-          {
-            @Override
-            public Void withHandle(Handle handle)
-            {
-              final Batch batch = handle.createBatch();
-              batch.add(
-                  StringUtils.format(
-                      "ALTER TABLE %1$s ALTER COLUMN USED_STATUS_LAST_UPDATED 
NULL",
-                      
dbTables.get().getSegmentsTable().toUpperCase(Locale.ENGLISH)
-                  )
-              );
-              batch.execute();
-              return null;
-            }
-          }
-      );
-    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to