abhishekrb19 commented on code in PR #16044:
URL: https://github.com/apache/druid/pull/16044#discussion_r1516173728
##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -293,29 +292,32 @@ public void testPollPeriodicallyAndOnDemandInterleave()
throws Exception
@Test
public void
testPrepareImmutableDataSourceWithUsedSegmentsAwaitsPollOnRestart() throws
IOException
{
- DataSegment newSegment = pollThenStopThenStartIntro();
+ publishWikiSegments();
+ DataSegment koalaSegment = pollThenStopThenStartIntro();
Review Comment:
It isn't immediately obvious why a koala segment is being returned by
`pollThenStopThenStartIntro`. In its implementation, I see the function
publishes a segment for koala datasource in the end. I'm guessing that's what
the "start intro" in the function name denotes. Should we rename the function
to `pollThenStopThenPublishKoalaSegment` or break up the functionality?
##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -343,123 +346,122 @@ private DataSegment pollThenStopThenStartIntro() throws
IOException
sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
Assert.assertEquals(
- ImmutableSet.of("wikipedia"),
+ ImmutableSet.of(DS.WIKI),
sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
);
- DataSegment newSegment = createNewSegment1("wikipedia2");
- publisher.publishSegment(newSegment);
+ final DataSegment koalaSegment = createNewSegment1(DS.KOALA);
+ publisher.publishSegment(koalaSegment);
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
- return newSegment;
+ return koalaSegment;
}
@Test
- public void testPollWithCorruptedSegment()
+ public void testPollWithCorruptedSegment() throws IOException
{
+ publishWikiSegments();
+
//create a corrupted segment entry in segments table, which tests
//that overall loading of segments from database continues to work
//even in one of the entries are corrupted.
Review Comment:
Make this comment a javadoc and fix a typo:
```suggestion
```
##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -347,70 +347,68 @@ public void populateUsedFlagLastUpdatedAsync()
@VisibleForTesting
void populateUsedFlagLastUpdated()
{
- String segmentsTable = getSegmentsTable();
+ final String segmentsTable = getSegmentsTable();
log.info(
- "Populating used_status_last_updated with non-NULL values for unused
segments in [%s]",
+ "Populating column 'used_status_last_updated' with non-NULL values for
unused segments in table[%s].",
segmentsTable
);
- int limit = 100;
+ final int batchSize = 100;
int totalUpdatedEntries = 0;
+ // Update the rows in batches of size 100
while (true) {
- List<String> segmentsToUpdate = new ArrayList<>(100);
+ final List<String> segmentsToUpdate = new ArrayList<>(batchSize);
+ int numUpdatedRows;
try {
connector.retryWithHandle(
- new HandleCallback<Void>()
- {
- @Override
- public Void withHandle(Handle handle)
- {
- segmentsToUpdate.addAll(handle.createQuery(
- StringUtils.format(
- "SELECT id FROM %1$s WHERE used_status_last_updated IS
NULL and used = :used %2$s",
- segmentsTable,
- connector.limitClause(limit)
- )
- ).bind("used", false).mapTo(String.class).list());
- return null;
- }
+ handle -> {
+ segmentsToUpdate.addAll(handle.createQuery(
+ StringUtils.format(
+ "SELECT id FROM %1$s WHERE used_status_last_updated IS
NULL and used = :used %2$s",
+ segmentsTable,
+ connector.limitClause(batchSize)
+ )
+ ).bind("used", false).mapTo(String.class).list());
+ return null;
}
);
if (segmentsToUpdate.isEmpty()) {
- // We have no segments to process
break;
}
- connector.retryWithHandle(
- new HandleCallback<Void>()
- {
- @Override
- public Void withHandle(Handle handle)
- {
- Batch updateBatch = handle.createBatch();
- String sql = "UPDATE %1$s SET used_status_last_updated =
'%2$s' WHERE id = '%3$s'";
- String now = DateTimes.nowUtc().toString();
- for (String id : segmentsToUpdate) {
- updateBatch.add(StringUtils.format(sql, segmentsTable, now,
id));
- }
- updateBatch.execute();
- return null;
+ numUpdatedRows = connector.retryWithHandle(
+ handle -> {
+ final Batch updateBatch = handle.createBatch();
+ final String sql = "UPDATE %1$s SET used_status_last_updated =
'%2$s' WHERE id = '%3$s'";
+ String now = DateTimes.nowUtc().toString();
+ for (String id : segmentsToUpdate) {
+ updateBatch.add(StringUtils.format(sql, segmentsTable, now,
id));
}
+ int[] results = updateBatch.execute();
+ return Arrays.stream(results).sum();
}
);
+ totalUpdatedEntries += numUpdatedRows;
}
catch (Exception e) {
log.warn(e, "Population of used_status_last_updated in [%s] has
failed. There may be unused segments with"
+ " NULL values for used_status_last_updated that won't be
killed!", segmentsTable);
Review Comment:
```suggestion
log.warn(e, "Populating column 'used_status_last_updated' in
table[%s] has failed. There may be unused segments with"
+ " NULL values for 'used_status_last_updated' that
won't be killed!", segmentsTable);
```
##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -343,123 +346,122 @@ private DataSegment pollThenStopThenStartIntro() throws
IOException
sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
Assert.assertEquals(
- ImmutableSet.of("wikipedia"),
+ ImmutableSet.of(DS.WIKI),
sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
);
- DataSegment newSegment = createNewSegment1("wikipedia2");
- publisher.publishSegment(newSegment);
+ final DataSegment koalaSegment = createNewSegment1(DS.KOALA);
+ publisher.publishSegment(koalaSegment);
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
- return newSegment;
+ return koalaSegment;
}
Review Comment:
```suggestion
/**
* Create a corrupted segment entry in the segments table to test
* whether the overall loading of segments from the database continues to
work
* even if one of the entries is corrupted.
*/
```
##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -343,123 +346,122 @@ private DataSegment pollThenStopThenStartIntro() throws
IOException
sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
Assert.assertEquals(
- ImmutableSet.of("wikipedia"),
+ ImmutableSet.of(DS.WIKI),
sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
);
- DataSegment newSegment = createNewSegment1("wikipedia2");
- publisher.publishSegment(newSegment);
+ final DataSegment koalaSegment = createNewSegment1(DS.KOALA);
+ publisher.publishSegment(koalaSegment);
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
- return newSegment;
+ return koalaSegment;
}
@Test
- public void testPollWithCorruptedSegment()
+ public void testPollWithCorruptedSegment() throws IOException
{
+ publishWikiSegments();
+
//create a corrupted segment entry in segments table, which tests
//that overall loading of segments from database continues to work
//even in one of the entries are corrupted.
- publisher.publishSegment(
- "corrupt-segment-id",
- "corrupt-datasource",
- "corrupt-create-date",
- "corrupt-start-date",
- "corrupt-end-date",
- true,
- "corrupt-version",
- true,
- StringUtils.toUtf8("corrupt-payload"),
- "corrupt-last-used-date"
- );
+ final DataSegment corruptSegment =
DataSegment.builder(wikiSegment1).dataSource("corrupt-datasource").build();
+ publisher.publishSegment(corruptSegment);
+ updateSegmentPayload(corruptSegment,
StringUtils.toUtf8("corrupt-payload"));
EmittingLogger.registerEmitter(new NoopServiceEmitter());
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
Assert.assertEquals(
- "wikipedia",
+ DS.WIKI,
Iterables.getOnlyElement(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).getName()
);
}
@Test
public void testGetUnusedSegmentIntervals() throws IOException
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
- // We alter the segment table to allow nullable used_status_last_updated
in order to test compatibility during druid upgrade from version without
used_status_last_updated.
- derbyConnectorRule.allowUsedFlagLastUpdatedToBeNullable();
+ // Allow null values of used_status_last_updated to test upgrade from
older Druid versions
+ allowUsedFlagLastUpdatedToBeNullable();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- int numChangedSegments =
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource("wikipedia");
+ int numChangedSegments =
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(DS.WIKI);
Assert.assertEquals(2, numChangedSegments);
- String newDs = "newDataSource";
- final DataSegment newSegment = createSegment(
- newDs,
+ // Publish an unused segment with used_status_last_updated 2 hours ago
+ final DataSegment koalaSegment1 = createSegment(
+ DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
-
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publish(newSegment, false,
DateTimes.nowUtc().minus(Duration.parse("PT7200S").getMillis()));
+ publishUnusedSegments(koalaSegment1);
+ updateUsedStatusLastUpdated(koalaSegment1,
DateTimes.nowUtc().minus(Duration.standardHours(2)));
- final DataSegment newSegment2 = createSegment(
- newDs,
+ // Publish an unused with used_status_last_updated 2 days ago
+ final DataSegment koalaSegment2 = createSegment(
+ DS.KOALA,
"2017-10-16T00:00:00.000/2017-10-17T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
-
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publish(newSegment2, false,
DateTimes.nowUtc().minus(Duration.parse("PT172800S").getMillis()));
+ publishUnusedSegments(koalaSegment2);
+ updateUsedStatusLastUpdated(koalaSegment2,
DateTimes.nowUtc().minus(Duration.standardDays(2)));
- final DataSegment newSegment3 = createSegment(
- newDs,
+ // Publish an unused segment and set used_status_last_updated to null
+ final DataSegment koalaSegment3 = createSegment(
+ DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
-
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publish(newSegment3, false, null);
+ publishUnusedSegments(koalaSegment3);
+ updateUsedStatusLastUpdatedToNull(koalaSegment3);
Assert.assertEquals(
- ImmutableList.of(segment2.getInterval()),
- sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia",
null, DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
+ ImmutableList.of(wikiSegment2.getInterval()),
+ sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null,
DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
);
// Test the DateTime maxEndTime argument of getUnusedSegmentIntervals
Assert.assertEquals(
- ImmutableList.of(segment2.getInterval()),
- sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia",
null, DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
+ ImmutableList.of(wikiSegment2.getInterval()),
+ sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null,
DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
);
Assert.assertEquals(
- ImmutableList.of(segment1.getInterval()),
- sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia",
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 4, 7, 0, 0), 1,
DateTimes.COMPARE_DATE_AS_STRING_MAX)
+ ImmutableList.of(wikiSegment1.getInterval()),
+ sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI,
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 4, 7, 0, 0), 1,
DateTimes.COMPARE_DATE_AS_STRING_MAX)
);
Assert.assertEquals(
ImmutableList.of(),
- sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia",
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 1, 7, 0, 0), 1,
DateTimes.COMPARE_DATE_AS_STRING_MAX)
+ sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI,
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 1, 7, 0, 0), 1,
DateTimes.COMPARE_DATE_AS_STRING_MAX)
);
Assert.assertEquals(
- ImmutableList.of(segment2.getInterval(), segment1.getInterval()),
- sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia",
null, DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX)
+ ImmutableList.of(wikiSegment2.getInterval(),
wikiSegment1.getInterval()),
+ sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null,
DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX)
);
// Test a buffer period that should exclude some segments
// The wikipedia datasource has segments generated with last used time
equal to roughly the time of test run. None of these segments should be
selected with a bufer period of 1 day
Assert.assertEquals(
ImmutableList.of(),
- sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia",
DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5,
DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
+ sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI,
DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5,
DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
);
- // One of the 3 segments in newDs has a null used_status_last_updated
which should mean getUnusedSegmentIntervals never returns it
- // One of the 3 segments in newDs has a used_status_last_updated older
than 1 day which means it should also be returned
- // The last of the 3 segemns in newDs has a used_status_last_updated date
less than one day and should not be returned
+ // koalaSegment3 has a null used_status_last_updated which should mean
getUnusedSegmentIntervals never returns it
+ // koalaSegment2 has a used_status_last_updated older than 1 day which
means it should also be returned
Review Comment:
```suggestion
// koalaSegment2 has a used_status_last_updated older than 1 day which
means it should be returned
```
##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -511,346 +506,312 @@ public void testMarkSegmentAsUnused() throws
IOException, InterruptedException
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment = createSegment(
- newDataSource,
+ final DataSegment koalaSegment = createSegment(
+ DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
-
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publisher.publishSegment(newSegment);
- awaitDataSourceAppeared(newDataSource);
-
Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+ publisher.publishSegment(koalaSegment);
+ awaitDataSourceAppeared(DS.KOALA);
+
Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
-
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(newSegment.getId()));
- awaitDataSourceDisappeared(newDataSource);
-
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(koalaSegment.getId()));
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ awaitDataSourceDisappeared(DS.KOALA);
+
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
+ System.out.println("Total time:" + stopwatch.millisElapsed());
Review Comment:
Is the stop watch and print required?
##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -343,123 +346,122 @@ private DataSegment pollThenStopThenStartIntro() throws
IOException
sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
Assert.assertEquals(
- ImmutableSet.of("wikipedia"),
+ ImmutableSet.of(DS.WIKI),
sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
);
- DataSegment newSegment = createNewSegment1("wikipedia2");
- publisher.publishSegment(newSegment);
+ final DataSegment koalaSegment = createNewSegment1(DS.KOALA);
+ publisher.publishSegment(koalaSegment);
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
- return newSegment;
+ return koalaSegment;
}
@Test
- public void testPollWithCorruptedSegment()
+ public void testPollWithCorruptedSegment() throws IOException
{
+ publishWikiSegments();
+
//create a corrupted segment entry in segments table, which tests
//that overall loading of segments from database continues to work
//even in one of the entries are corrupted.
- publisher.publishSegment(
- "corrupt-segment-id",
- "corrupt-datasource",
- "corrupt-create-date",
- "corrupt-start-date",
- "corrupt-end-date",
- true,
- "corrupt-version",
- true,
- StringUtils.toUtf8("corrupt-payload"),
- "corrupt-last-used-date"
- );
+ final DataSegment corruptSegment =
DataSegment.builder(wikiSegment1).dataSource("corrupt-datasource").build();
+ publisher.publishSegment(corruptSegment);
+ updateSegmentPayload(corruptSegment,
StringUtils.toUtf8("corrupt-payload"));
EmittingLogger.registerEmitter(new NoopServiceEmitter());
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
Assert.assertEquals(
- "wikipedia",
+ DS.WIKI,
Iterables.getOnlyElement(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).getName()
);
}
@Test
public void testGetUnusedSegmentIntervals() throws IOException
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
- // We alter the segment table to allow nullable used_status_last_updated
in order to test compatibility during druid upgrade from version without
used_status_last_updated.
- derbyConnectorRule.allowUsedFlagLastUpdatedToBeNullable();
+ // Allow null values of used_status_last_updated to test upgrade from
older Druid versions
+ allowUsedFlagLastUpdatedToBeNullable();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- int numChangedSegments =
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource("wikipedia");
+ int numChangedSegments =
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(DS.WIKI);
Assert.assertEquals(2, numChangedSegments);
- String newDs = "newDataSource";
- final DataSegment newSegment = createSegment(
- newDs,
+ // Publish an unused segment with used_status_last_updated 2 hours ago
+ final DataSegment koalaSegment1 = createSegment(
+ DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
-
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publish(newSegment, false,
DateTimes.nowUtc().minus(Duration.parse("PT7200S").getMillis()));
+ publishUnusedSegments(koalaSegment1);
+ updateUsedStatusLastUpdated(koalaSegment1,
DateTimes.nowUtc().minus(Duration.standardHours(2)));
- final DataSegment newSegment2 = createSegment(
- newDs,
+ // Publish an unused with used_status_last_updated 2 days ago
Review Comment:
```suggestion
// Publish an unused segment with used_status_last_updated 2 days ago
```
##########
server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java:
##########
@@ -511,346 +506,312 @@ public void testMarkSegmentAsUnused() throws
IOException, InterruptedException
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment = createSegment(
- newDataSource,
+ final DataSegment koalaSegment = createSegment(
+ DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
-
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publisher.publishSegment(newSegment);
- awaitDataSourceAppeared(newDataSource);
-
Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+ publisher.publishSegment(koalaSegment);
+ awaitDataSourceAppeared(DS.KOALA);
+
Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
-
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(newSegment.getId()));
- awaitDataSourceDisappeared(newDataSource);
-
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(koalaSegment.getId()));
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ awaitDataSourceDisappeared(DS.KOALA);
+
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
+ System.out.println("Total time:" + stopwatch.millisElapsed());
}
- private void awaitDataSourceAppeared(String newDataSource) throws
InterruptedException
+ private void awaitDataSourceAppeared(String datasource) throws
InterruptedException
{
- while
(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource)
== null) {
- Thread.sleep(1000);
+ while
(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(datasource)
== null) {
+ Thread.sleep(5);
}
}
private void awaitDataSourceDisappeared(String dataSource) throws
InterruptedException
{
while
(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(dataSource)
!= null) {
- Thread.sleep(1000);
+ Thread.sleep(5);
}
}
@Test
public void testMarkAsUsedNonOvershadowedSegments() throws Exception
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment1 = createSegment(
- newDataSource,
+ final DataSegment koalaSegment1 = createSegment(
+ DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- final DataSegment newSegment2 = createSegment(
- newDataSource,
+ final DataSegment koalaSegment2 = createSegment(
+ DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
- "2017-10-16T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 1
+ "2017-10-16T20:19:12.565Z"
);
- // Overshadowed by newSegment2
- final DataSegment newSegment3 = createSegment(
- newDataSource,
+ // Overshadowed by koalaSegment2
+ final DataSegment koalaSegment3 = createSegment(
+ DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 1
+ "2017-10-15T20:19:12.565Z"
);
- publish(newSegment1, false);
- publish(newSegment2, false);
- publish(newSegment3, false);
- final ImmutableSet<String> segmentIds = ImmutableSet.of(
- newSegment1.getId().toString(),
- newSegment2.getId().toString(),
- newSegment3.getId().toString()
+ publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3);
+ final Set<String> segmentIds = ImmutableSet.of(
+ koalaSegment1.getId().toString(),
+ koalaSegment2.getId().toString(),
+ koalaSegment3.getId().toString()
);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
- Assert.assertEquals(2,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(newDataSource,
segmentIds));
+ Assert.assertEquals(2,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(DS.KOALA,
segmentIds));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2, newSegment1, newSegment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1,
koalaSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
- @Test(expected = UnknownSegmentIdsException.class)
+ @Test
public void testMarkAsUsedNonOvershadowedSegmentsInvalidDataSource() throws
Exception
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment1 = createNewSegment1(newDataSource);
+ final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+ final DataSegment koalaSegment2 = createNewSegment1(DS.KOALA);
- final DataSegment newSegment2 = createNewSegment1(newDataSource);
-
- publish(newSegment1, false);
- publish(newSegment2, false);
+ publishUnusedSegments(koalaSegment1, koalaSegment2);
final ImmutableSet<String> segmentIds =
- ImmutableSet.of(newSegment1.getId().toString(),
newSegment2.getId().toString());
+ ImmutableSet.of(koalaSegment1.getId().toString(),
koalaSegment2.getId().toString());
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
- // none of the segments are in data source
- Assert.assertEquals(0,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments("wrongDataSource",
segmentIds));
+
+ Assert.assertThrows(
+ UnknownSegmentIdsException.class,
+ () ->
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments("wrongDataSource",
segmentIds)
+ );
}
- @Test(expected = UnknownSegmentIdsException.class)
- public void testMarkAsUsedNonOvershadowedSegmentsWithInvalidSegmentIds()
throws UnknownSegmentIdsException
+ @Test
+ public void testMarkAsUsedNonOvershadowedSegmentsWithInvalidSegmentIds()
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment1 = createNewSegment1(newDataSource);
-
- final DataSegment newSegment2 = createNewSegment1(newDataSource);
+ final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+ final DataSegment koalaSegment2 = createNewSegment1(DS.KOALA);
final ImmutableSet<String> segmentIds =
- ImmutableSet.of(newSegment1.getId().toString(),
newSegment2.getId().toString());
+ ImmutableSet.of(koalaSegment1.getId().toString(),
koalaSegment2.getId().toString());
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
- // none of the segments are in data source
- Assert.assertEquals(0,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(newDataSource,
segmentIds));
+
+ Assert.assertThrows(
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]