This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5f203725dd3 Clean up SqlSegmentsMetadataManager and corresponding
tests (#16044)
5f203725dd3 is described below
commit 5f203725dd39f240fc4ba5ef8181bb80a972d7c3
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Mar 8 07:34:51 2024 +0530
Clean up SqlSegmentsMetadataManager and corresponding tests (#16044)
Changes:
Improve `SqlSegmentsMetadataManager`
- Break the loop in `populateUsedStatusLastUpdated` before going to sleep
if there are no more segments to update
- Add comments and clean up logs
Refactor `SqlSegmentsMetadataManagerTest`
- Merge `SqlSegmentsMetadataManagerEmptyTest` into this test
- Add method `testPollEmpty`
- Shave a few seconds off of the tests by reducing poll duration
- Simplify creation of test segments
- Some renames here and there
- Remove unused methods
- Move `TestDerbyConnector.allowLastUsedFlagToBeNull` to this class
Other minor changes
- Add javadoc to `NoneShardSpec`
- Use lambda in `SqlSegmentMetadataPublisher`
---
.../druid/timeline/partition/NoneShardSpec.java | 10 +
.../metadata/SQLMetadataSegmentPublisher.java | 35 +-
.../druid/metadata/SqlSegmentsMetadataManager.java | 85 ++-
.../SqlSegmentsMetadataManagerEmptyTest.java | 113 ----
.../metadata/SqlSegmentsMetadataManagerTest.java | 739 +++++++++++----------
.../apache/druid/metadata/TestDerbyConnector.java | 26 -
6 files changed, 433 insertions(+), 575 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
b/processing/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
index 4661b3ab73b..e29d843c98b 100644
---
a/processing/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
+++
b/processing/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
@@ -29,6 +29,16 @@ import java.util.List;
import java.util.Map;
/**
+ * {@link ShardSpec} with no partitioning in a time chunk, i.e. a single
segment
+ * per time chunk. This shard spec has been deprecated and is not generated by
+ * the Druid code anymore. The class has been retained only for backward
+ * compatibility reasons.
+ * <p>
+ * For more information, refer to
+ * <a href="https://github.com/apache/druid/pull/6883">PR #6883</a>.
+ *
+ * @deprecated Since Druid 0.15.0. Segments generated by Druid 0.15.0 onwards
+ * do not use this shard spec.
*/
@Deprecated
public class NoneShardSpec implements ShardSpec
diff --git
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java
index b69f15edb6b..48a92ecba4e 100644
---
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java
+++
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java
@@ -20,7 +20,6 @@
package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
@@ -28,8 +27,6 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.skife.jdbi.v2.DBI;
-import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException;
import java.util.List;
@@ -79,8 +76,7 @@ public class SQLMetadataSegmentPublisher implements
MetadataSegmentPublisher
);
}
- @VisibleForTesting
- void publishSegment(
+ private void publishSegment(
final String segmentId,
final String dataSource,
final String createdDate,
@@ -96,31 +92,18 @@ public class SQLMetadataSegmentPublisher implements
MetadataSegmentPublisher
try {
final DBI dbi = connector.getDBI();
List<Map<String, Object>> exists = dbi.withHandle(
- new HandleCallback<List<Map<String, Object>>>()
- {
- @Override
- public List<Map<String, Object>> withHandle(Handle handle)
- {
- return handle.createQuery(
- StringUtils.format("SELECT id FROM %s WHERE id=:id",
config.getSegmentsTable())
- )
- .bind("id", segmentId)
- .list();
- }
- }
+ handle -> handle.createQuery(
+ StringUtils.format("SELECT id FROM %s WHERE id=:id",
config.getSegmentsTable())
+ ).bind("id", segmentId).list()
);
if (!exists.isEmpty()) {
- log.info("Found [%s] in DB, not updating DB", segmentId);
+ log.info("Skipping publish of segment[%s] as it already exists in the
metadata store.", segmentId);
return;
}
dbi.withHandle(
- new HandleCallback<Void>()
- {
- @Override
- public Void withHandle(Handle handle)
- {
+ handle ->
handle.createStatement(statement)
.bind("id", segmentId)
.bind("dataSource", dataSource)
@@ -132,11 +115,7 @@ public class SQLMetadataSegmentPublisher implements
MetadataSegmentPublisher
.bind("used", used)
.bind("payload", payload)
.bind("used_status_last_updated", usedFlagLastUpdated)
- .execute();
-
- return null;
- }
- }
+ .execute()
);
}
catch (Exception e) {
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 1bf8ec534a9..0ddf4a2d2cc 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -64,7 +64,6 @@ import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import javax.annotation.Nullable;
@@ -72,6 +71,7 @@ import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -335,7 +335,7 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
{
ExecutorService executorService = Executors.newSingleThreadExecutor();
usedFlagLastUpdatedPopulationFuture = executorService.submit(
- () -> populateUsedFlagLastUpdated()
+ this::populateUsedFlagLastUpdated
);
}
@@ -347,70 +347,68 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
@VisibleForTesting
void populateUsedFlagLastUpdated()
{
- String segmentsTable = getSegmentsTable();
+ final String segmentsTable = getSegmentsTable();
log.info(
- "Populating used_status_last_updated with non-NULL values for unused
segments in [%s]",
+ "Populating column 'used_status_last_updated' with non-NULL values for
unused segments in table[%s].",
segmentsTable
);
- int limit = 100;
+ final int batchSize = 100;
int totalUpdatedEntries = 0;
+ // Update the rows in batches of size 100
while (true) {
- List<String> segmentsToUpdate = new ArrayList<>(100);
+ final List<String> segmentsToUpdate = new ArrayList<>(batchSize);
+ int numUpdatedRows;
try {
connector.retryWithHandle(
- new HandleCallback<Void>()
- {
- @Override
- public Void withHandle(Handle handle)
- {
- segmentsToUpdate.addAll(handle.createQuery(
- StringUtils.format(
- "SELECT id FROM %1$s WHERE used_status_last_updated IS
NULL and used = :used %2$s",
- segmentsTable,
- connector.limitClause(limit)
- )
- ).bind("used", false).mapTo(String.class).list());
- return null;
- }
+ handle -> {
+ segmentsToUpdate.addAll(handle.createQuery(
+ StringUtils.format(
+ "SELECT id FROM %1$s WHERE used_status_last_updated IS
NULL and used = :used %2$s",
+ segmentsTable,
+ connector.limitClause(batchSize)
+ )
+ ).bind("used", false).mapTo(String.class).list());
+ return null;
}
);
if (segmentsToUpdate.isEmpty()) {
- // We have no segments to process
break;
}
- connector.retryWithHandle(
- new HandleCallback<Void>()
- {
- @Override
- public Void withHandle(Handle handle)
- {
- Batch updateBatch = handle.createBatch();
- String sql = "UPDATE %1$s SET used_status_last_updated =
'%2$s' WHERE id = '%3$s'";
- String now = DateTimes.nowUtc().toString();
- for (String id : segmentsToUpdate) {
- updateBatch.add(StringUtils.format(sql, segmentsTable, now,
id));
- }
- updateBatch.execute();
- return null;
+ numUpdatedRows = connector.retryWithHandle(
+ handle -> {
+ final Batch updateBatch = handle.createBatch();
+ final String sql = "UPDATE %1$s SET used_status_last_updated =
'%2$s' WHERE id = '%3$s'";
+ String now = DateTimes.nowUtc().toString();
+ for (String id : segmentsToUpdate) {
+ updateBatch.add(StringUtils.format(sql, segmentsTable, now,
id));
}
+ int[] results = updateBatch.execute();
+ return Arrays.stream(results).sum();
}
);
+ totalUpdatedEntries += numUpdatedRows;
}
catch (Exception e) {
- log.warn(e, "Population of used_status_last_updated in [%s] has
failed. There may be unused segments with"
- + " NULL values for used_status_last_updated that won't be
killed!", segmentsTable);
+ log.warn(e, "Populating column 'used_status_last_updated' in table[%s]
has failed. There may be unused segments with"
+ + " NULL values for 'used_status_last_updated' that won't
be killed!", segmentsTable);
return;
}
- totalUpdatedEntries += segmentsToUpdate.size();
- log.info("Updated a batch of %d rows in [%s] with a valid
used_status_last_updated date",
- segmentsToUpdate.size(),
- segmentsTable
+ log.debug(
+ "Updated a batch of [%d] rows in table[%s] with a valid
used_status_last_updated date",
+ segmentsToUpdate.size(), segmentsTable
);
+
+ // Do not wait if there are no more segments to update
+ if (segmentsToUpdate.size() == numUpdatedRows && numUpdatedRows <
batchSize) {
+ break;
+ }
+
+ // Wait for some time before processing the next batch
try {
Thread.sleep(10000);
}
@@ -420,9 +418,8 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
}
}
log.info(
- "Finished updating [%s] with a valid used_status_last_updated date. %d
rows updated",
- segmentsTable,
- totalUpdatedEntries
+ "Populated column 'used_status_last_updated' in table[%s] in [%d]
rows.",
+ segmentsTable, totalUpdatedEntries
);
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerEmptyTest.java
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerEmptyTest.java
deleted file mode 100644
index 440aff3c084..00000000000
---
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerEmptyTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.metadata;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.druid.client.ImmutableDruidDataSource;
-import org.apache.druid.segment.TestHelper;
-import org.joda.time.Period;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.util.stream.Collectors;
-
-
-/**
- * Like {@link SQLMetadataRuleManagerTest} except with no segments to make
sure it behaves when it's empty
- */
-public class SqlSegmentsMetadataManagerEmptyTest
-{
-
- @Rule
- public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new
TestDerbyConnector.DerbyConnectorRule();
-
- private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
- private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
-
- @Before
- public void setUp()
- {
- TestDerbyConnector connector = derbyConnectorRule.getConnector();
- SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig();
- config.setPollDuration(Period.seconds(1));
- sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
- jsonMapper,
- Suppliers.ofInstance(config),
- derbyConnectorRule.metadataTablesConfigSupplier(),
- connector
- );
- sqlSegmentsMetadataManager.start();
-
- connector.createSegmentTable();
- }
-
- @After
- public void teardown()
- {
- if (sqlSegmentsMetadataManager.isPollingDatabasePeriodically()) {
- sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
- }
- sqlSegmentsMetadataManager.stop();
- }
-
- @Test
- public void testPollEmpty()
- {
- sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
- sqlSegmentsMetadataManager.poll();
-
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- Assert.assertEquals(
- ImmutableSet.of(),
- sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
- );
- Assert.assertEquals(
- ImmutableList.of(),
- sqlSegmentsMetadataManager
- .getImmutableDataSourcesWithAllUsedSegments()
- .stream()
- .map(ImmutableDruidDataSource::getName)
- .collect(Collectors.toList())
- );
- Assert.assertEquals(
- null,
-
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia")
- );
- Assert.assertEquals(
- ImmutableSet.of(),
-
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
- );
- }
-
- @Test
- public void testStopAndStart()
- {
- // Simulate successive losing and getting the coordinator leadership
- sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
- sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
- sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
- sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
- }
-}
diff --git
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
index ab024d568db..85e5021f6c6 100644
---
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
@@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@@ -46,101 +47,79 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException;
-import java.util.List;
import java.util.Locale;
-import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class SqlSegmentsMetadataManagerTest
{
+ private static class DS
+ {
+ static final String WIKI = "wikipedia";
+ static final String KOALA = "koala";
+ }
+
private static DataSegment createSegment(
String dataSource,
String interval,
- String version,
- String bucketKey,
- int binaryVersion
+ String version
)
{
return new DataSegment(
dataSource,
Intervals.of(interval),
version,
- ImmutableMap.of(
- "type", "s3_zip",
- "bucket", "test",
- "key", dataSource + "/" + bucketKey
- ),
- ImmutableList.of("dim1", "dim2", "dim3"),
- ImmutableList.of("count", "value"),
+ ImmutableMap.of(),
+ ImmutableList.of(),
+ ImmutableList.of(),
NoneShardSpec.instance(),
- binaryVersion,
+ 9,
1234L
);
}
@Rule
- public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new
TestDerbyConnector.DerbyConnectorRule();
+ public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule
+ = new TestDerbyConnector.DerbyConnectorRule();
private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private SQLMetadataSegmentPublisher publisher;
- private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
+ private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();
- private final DataSegment segment1 = createSegment(
- "wikipedia",
- "2012-03-15T00:00:00.000/2012-03-16T00:00:00.000",
- "2012-03-16T00:36:30.848Z",
- "index/y=2012/m=03/d=15/2012-03-16T00:36:30.848Z/0/index.zip",
- 0
- );
+ private final DataSegment wikiSegment1 =
+
CreateDataSegments.ofDatasource(DS.WIKI).startingAt("2012-03-15").eachOfSizeInMb(500).get(0);
+ private final DataSegment wikiSegment2 =
+
CreateDataSegments.ofDatasource(DS.WIKI).startingAt("2012-01-05").eachOfSizeInMb(500).get(0);
- private final DataSegment segment2 = createSegment(
- "wikipedia",
- "2012-01-05T00:00:00.000/2012-01-06T00:00:00.000",
- "2012-01-06T22:19:12.565Z",
- "wikipedia/index/y=2012/m=01/d=05/2012-01-06T22:19:12.565Z/0/index.zip",
- 0
- );
-
- private void publish(DataSegment segment, boolean used) throws IOException
+ private void publishUnusedSegments(DataSegment... segments) throws
IOException
{
- publish(segment, used, DateTimes.nowUtc());
+ for (DataSegment segment : segments) {
+ publisher.publishSegment(segment);
+ sqlSegmentsMetadataManager.markSegmentAsUnused(segment.getId());
+ }
}
- private void publish(DataSegment segment, boolean used, DateTime
usedFlagLastUpdated) throws IOException
+ private void publishWikiSegments()
{
- boolean partitioned = !(segment.getShardSpec() instanceof NoneShardSpec);
-
- String usedFlagLastUpdatedStr = null;
- if (null != usedFlagLastUpdated) {
- usedFlagLastUpdatedStr = usedFlagLastUpdated.toString();
+ try {
+ publisher.publishSegment(wikiSegment1);
+ publisher.publishSegment(wikiSegment2);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
}
- publisher.publishSegment(
- segment.getId().toString(),
- segment.getDataSource(),
- DateTimes.nowUtc().toString(),
- segment.getInterval().getStart().toString(),
- segment.getInterval().getEnd().toString(),
- partitioned,
- segment.getVersion(),
- used,
- jsonMapper.writeValueAsBytes(segment),
- usedFlagLastUpdatedStr
- );
}
@Before
- public void setUp() throws Exception
+ public void setUp()
{
- TestDerbyConnector connector = derbyConnectorRule.getConnector();
+ final TestDerbyConnector connector = derbyConnectorRule.getConnector();
SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig();
- config.setPollDuration(Period.seconds(3));
+ config.setPollDuration(Period.millis(1));
sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
- jsonMapper,
+ JSON_MAPPER,
Suppliers.ofInstance(config),
derbyConnectorRule.metadataTablesConfigSupplier(),
connector
@@ -148,15 +127,12 @@ public class SqlSegmentsMetadataManagerTest
sqlSegmentsMetadataManager.start();
publisher = new SQLMetadataSegmentPublisher(
- jsonMapper,
+ JSON_MAPPER,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
connector
);
connector.createSegmentTable();
-
- publisher.publishSegment(segment1);
- publisher.publishSegment(segment2);
}
@After
@@ -168,9 +144,32 @@ public class SqlSegmentsMetadataManagerTest
sqlSegmentsMetadataManager.stop();
}
+ @Test
+ public void testPollEmpty()
+ {
+ sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
+ sqlSegmentsMetadataManager.poll();
+
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+ Assert.assertTrue(
+ sqlSegmentsMetadataManager.retrieveAllDataSourceNames().isEmpty()
+ );
+ Assert.assertEquals(
+ 0,
+ sqlSegmentsMetadataManager
+ .getImmutableDataSourcesWithAllUsedSegments()
+ .stream()
+ .map(ImmutableDruidDataSource::getName).count()
+ );
+
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.WIKI));
+ Assert.assertTrue(
+
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()).isEmpty()
+ );
+ }
+
@Test
public void testPollPeriodically()
{
+ publishWikiSegments();
DataSourcesSnapshot dataSourcesSnapshot =
sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertNull(dataSourcesSnapshot);
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
@@ -180,22 +179,22 @@ public class SqlSegmentsMetadataManagerTest
Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll()
instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertEquals(
- ImmutableSet.of("wikipedia"),
+ ImmutableSet.of(DS.WIKI),
sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
);
Assert.assertEquals(
- ImmutableList.of("wikipedia"),
+ ImmutableList.of(DS.WIKI),
dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
.stream()
.map(ImmutableDruidDataSource::getName)
.collect(Collectors.toList())
);
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2),
-
ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments())
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
+
ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource(DS.WIKI).getSegments())
);
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot())
);
}
@@ -203,6 +202,7 @@ public class SqlSegmentsMetadataManagerTest
@Test
public void testPollOnDemand()
{
+ publishWikiSegments();
DataSourcesSnapshot dataSourcesSnapshot =
sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertNull(dataSourcesSnapshot);
// This should return false and not wait/poll anything as we did not
schedule periodic poll
@@ -214,22 +214,22 @@ public class SqlSegmentsMetadataManagerTest
Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll()
instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll);
dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertEquals(
- ImmutableSet.of("wikipedia"),
+ ImmutableSet.of(DS.WIKI),
sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
);
Assert.assertEquals(
- ImmutableList.of("wikipedia"),
+ ImmutableList.of(DS.WIKI),
dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
.stream()
.map(ImmutableDruidDataSource::getName)
.collect(Collectors.toList())
);
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2),
-
ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments())
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
+
ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource(DS.WIKI).getSegments())
);
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot())
);
}
@@ -237,6 +237,7 @@ public class SqlSegmentsMetadataManagerTest
@Test(timeout = 60_000)
public void testPollPeriodicallyAndOnDemandInterleave() throws Exception
{
+ publishWikiSegments();
DataSourcesSnapshot dataSourcesSnapshot =
sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertNull(dataSourcesSnapshot);
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
@@ -246,15 +247,13 @@ public class SqlSegmentsMetadataManagerTest
Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll()
instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertEquals(
- ImmutableList.of("wikipedia"),
+ ImmutableList.of(DS.WIKI),
dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
.stream()
.map(ImmutableDruidDataSource::getName)
.collect(Collectors.toList())
);
- final String newDataSource2 = "wikipedia2";
- final DataSegment newSegment2 = createNewSegment1(newDataSource2);
- publisher.publishSegment(newSegment2);
+ publisher.publishSegment(createNewSegment1(DS.KOALA));
// This call will force on demand poll
sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll();
@@ -263,7 +262,7 @@ public class SqlSegmentsMetadataManagerTest
// New datasource should now be in the snapshot since we just force on
demand poll.
dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertEquals(
- ImmutableList.of("wikipedia2", "wikipedia"),
+ ImmutableList.of(DS.KOALA, DS.WIKI),
dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
.stream()
.map(ImmutableDruidDataSource::getName)
@@ -271,8 +270,7 @@ public class SqlSegmentsMetadataManagerTest
);
final String newDataSource3 = "wikipedia3";
- final DataSegment newSegment3 = createNewSegment1(newDataSource3);
- publisher.publishSegment(newSegment3);
+ publisher.publishSegment(createNewSegment1(newDataSource3));
// This time wait for periodic poll (not doing on demand poll so we have
to wait a bit...)
while
(sqlSegmentsMetadataManager.getDataSourcesSnapshot().getDataSource(newDataSource3)
== null) {
@@ -282,7 +280,7 @@ public class SqlSegmentsMetadataManagerTest
Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll()
instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertEquals(
- ImmutableSet.of("wikipedia2", "wikipedia3", "wikipedia"),
+ ImmutableSet.of(DS.KOALA, "wikipedia3", DS.WIKI),
dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
.stream()
.map(ImmutableDruidDataSource::getName)
@@ -293,29 +291,32 @@ public class SqlSegmentsMetadataManagerTest
@Test
public void
testPrepareImmutableDataSourceWithUsedSegmentsAwaitsPollOnRestart() throws
IOException
{
- DataSegment newSegment = pollThenStopThenStartIntro();
+ publishWikiSegments();
+ DataSegment koalaSegment = pollThenStopThenPublishKoalaSegment();
Assert.assertEquals(
- ImmutableSet.of(newSegment),
-
ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia2").getSegments())
+ ImmutableSet.of(koalaSegment),
+
ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA).getSegments())
);
}
@Test
public void testGetDataSourceWithUsedSegmentsAwaitsPollOnRestart() throws
IOException
{
- DataSegment newSegment = pollThenStopThenStartIntro();
+ publishWikiSegments();
+ DataSegment koalaSegment = pollThenStopThenPublishKoalaSegment();
Assert.assertEquals(
- ImmutableSet.of(newSegment),
-
ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia2").getSegments())
+ ImmutableSet.of(koalaSegment),
+
ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA).getSegments())
);
}
@Test
public void
testPrepareImmutableDataSourcesWithAllUsedSegmentsAwaitsPollOnRestart() throws
IOException
{
- DataSegment newSegment = pollThenStopThenStartIntro();
+ publishWikiSegments();
+ DataSegment koalaSegment = pollThenStopThenPublishKoalaSegment();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2, newSegment),
+ ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment),
ImmutableSet.copyOf(
sqlSegmentsMetadataManager
.getImmutableDataSourcesWithAllUsedSegments()
@@ -329,54 +330,49 @@ public class SqlSegmentsMetadataManagerTest
@Test
public void testIterateAllUsedSegmentsAwaitsPollOnRestart() throws
IOException
{
- DataSegment newSegment = pollThenStopThenStartIntro();
+ publishWikiSegments();
+ DataSegment koalaSegment = pollThenStopThenPublishKoalaSegment();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2, newSegment),
+ ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
- private DataSegment pollThenStopThenStartIntro() throws IOException
+ private DataSegment pollThenStopThenPublishKoalaSegment() throws IOException
{
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
Assert.assertEquals(
- ImmutableSet.of("wikipedia"),
+ ImmutableSet.of(DS.WIKI),
sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
);
- DataSegment newSegment = createNewSegment1("wikipedia2");
- publisher.publishSegment(newSegment);
+ final DataSegment koalaSegment = createNewSegment1(DS.KOALA);
+ publisher.publishSegment(koalaSegment);
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
- return newSegment;
+ return koalaSegment;
}
-
+ /**
+ * Create a corrupted segment entry in the segments table to test
+ * whether the overall loading of segments from the database continues to
work
+ * even if one of the entries is corrupted.
+ */
@Test
- public void testPollWithCorruptedSegment()
- {
- //create a corrupted segment entry in segments table, which tests
- //that overall loading of segments from database continues to work
- //even in one of the entries are corrupted.
- publisher.publishSegment(
- "corrupt-segment-id",
- "corrupt-datasource",
- "corrupt-create-date",
- "corrupt-start-date",
- "corrupt-end-date",
- true,
- "corrupt-version",
- true,
- StringUtils.toUtf8("corrupt-payload"),
- "corrupt-last-used-date"
- );
+ public void testPollWithCorruptedSegment() throws IOException
+ {
+ publishWikiSegments();
+
+ final DataSegment corruptSegment =
DataSegment.builder(wikiSegment1).dataSource("corrupt-datasource").build();
+ publisher.publishSegment(corruptSegment);
+ updateSegmentPayload(corruptSegment,
StringUtils.toUtf8("corrupt-payload"));
EmittingLogger.registerEmitter(new NoopServiceEmitter());
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
Assert.assertEquals(
- "wikipedia",
+ DS.WIKI,
Iterables.getOnlyElement(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).getName()
);
}
@@ -384,66 +380,66 @@ public class SqlSegmentsMetadataManagerTest
@Test
public void testGetUnusedSegmentIntervals() throws IOException
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
- // We alter the segment table to allow nullable used_status_last_updated
in order to test compatibility during druid upgrade from version without
used_status_last_updated.
- derbyConnectorRule.allowUsedFlagLastUpdatedToBeNullable();
+ // Allow null values of used_status_last_updated to test upgrade from
older Druid versions
+ allowUsedFlagLastUpdatedToBeNullable();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- int numChangedSegments =
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource("wikipedia");
+ int numChangedSegments =
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(DS.WIKI);
Assert.assertEquals(2, numChangedSegments);
- String newDs = "newDataSource";
- final DataSegment newSegment = createSegment(
- newDs,
+ // Publish an unused segment with used_status_last_updated 2 hours ago
+ final DataSegment koalaSegment1 = createSegment(
+ DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
-
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publish(newSegment, false,
DateTimes.nowUtc().minus(Duration.parse("PT7200S").getMillis()));
+ publishUnusedSegments(koalaSegment1);
+ updateUsedStatusLastUpdated(koalaSegment1,
DateTimes.nowUtc().minus(Duration.standardHours(2)));
- final DataSegment newSegment2 = createSegment(
- newDs,
+ // Publish an unused segment with used_status_last_updated 2 days ago
+ final DataSegment koalaSegment2 = createSegment(
+ DS.KOALA,
"2017-10-16T00:00:00.000/2017-10-17T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
-
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publish(newSegment2, false,
DateTimes.nowUtc().minus(Duration.parse("PT172800S").getMillis()));
+ publishUnusedSegments(koalaSegment2);
+ updateUsedStatusLastUpdated(koalaSegment2,
DateTimes.nowUtc().minus(Duration.standardDays(2)));
- final DataSegment newSegment3 = createSegment(
- newDs,
+ // Publish an unused segment and set used_status_last_updated to null
+ final DataSegment koalaSegment3 = createSegment(
+ DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
-
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publish(newSegment3, false, null);
+ publishUnusedSegments(koalaSegment3);
+ updateUsedStatusLastUpdatedToNull(koalaSegment3);
Assert.assertEquals(
- ImmutableList.of(segment2.getInterval()),
- sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia",
null, DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
+ ImmutableList.of(wikiSegment2.getInterval()),
+ sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null,
DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
);
// Test the DateTime maxEndTime argument of getUnusedSegmentIntervals
Assert.assertEquals(
- ImmutableList.of(segment2.getInterval()),
- sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia",
null, DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
+ ImmutableList.of(wikiSegment2.getInterval()),
+ sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null,
DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
);
Assert.assertEquals(
- ImmutableList.of(segment1.getInterval()),
- sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia",
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 4, 7, 0, 0), 1,
DateTimes.COMPARE_DATE_AS_STRING_MAX)
+ ImmutableList.of(wikiSegment1.getInterval()),
+ sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI,
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 4, 7, 0, 0), 1,
DateTimes.COMPARE_DATE_AS_STRING_MAX)
);
Assert.assertEquals(
ImmutableList.of(),
- sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia",
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 1, 7, 0, 0), 1,
DateTimes.COMPARE_DATE_AS_STRING_MAX)
+ sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI,
DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 1, 7, 0, 0), 1,
DateTimes.COMPARE_DATE_AS_STRING_MAX)
);
Assert.assertEquals(
- ImmutableList.of(segment2.getInterval(), segment1.getInterval()),
- sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia",
null, DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX)
+ ImmutableList.of(wikiSegment2.getInterval(),
wikiSegment1.getInterval()),
+ sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null,
DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX)
);
// Test a buffer period that should exclude some segments
@@ -451,15 +447,21 @@ public class SqlSegmentsMetadataManagerTest
// The wikipedia datasource has segments generated with last used time
equal to roughly the time of test run. None of these segments should be
selected with a bufer period of 1 day
Assert.assertEquals(
ImmutableList.of(),
- sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia",
DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5,
DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
+ sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI,
DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5,
DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
);
- // One of the 3 segments in newDs has a null used_status_last_updated
which should mean getUnusedSegmentIntervals never returns it
- // One of the 3 segments in newDs has a used_status_last_updated older
than 1 day which means it should also be returned
- // The last of the 3 segemns in newDs has a used_status_last_updated date
less than one day and should not be returned
+ // koalaSegment3 has a null used_status_last_updated which should mean
getUnusedSegmentIntervals never returns it
+ // koalaSegment2 has a used_status_last_updated older than 1 day which
means it should be returned
+ // The last of the 3 segments in koala has a used_status_last_updated date
less than one day and should not be returned
Assert.assertEquals(
- ImmutableList.of(newSegment2.getInterval()),
- sqlSegmentsMetadataManager.getUnusedSegmentIntervals(newDs,
DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5,
DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
+ ImmutableList.of(koalaSegment2.getInterval()),
+ sqlSegmentsMetadataManager.getUnusedSegmentIntervals(
+ DS.KOALA,
+ DateTimes.COMPARE_DATE_AS_STRING_MIN,
+ DateTimes.of("3000"),
+ 5,
+ DateTimes.nowUtc().minus(Duration.parse("PT86400S"))
+ )
);
}
@@ -470,37 +472,30 @@ public class SqlSegmentsMetadataManagerTest
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment = createNewSegment1(newDataSource);
-
- publisher.publishSegment(newSegment);
+ publisher.publishSegment(createNewSegment1(DS.KOALA));
- awaitDataSourceAppeared(newDataSource);
- int numChangedSegments =
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(newDataSource);
+ awaitDataSourceAppeared(DS.KOALA);
+ int numChangedSegments =
sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(DS.KOALA);
Assert.assertEquals(1, numChangedSegments);
- awaitDataSourceDisappeared(newDataSource);
-
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+ awaitDataSourceDisappeared(DS.KOALA);
+
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
}
- private static DataSegment createNewSegment1(String newDataSource)
+ private static DataSegment createNewSegment1(String datasource)
{
return createSegment(
- newDataSource,
+ datasource,
"2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
}
- private static DataSegment createNewSegment2(String newDataSource)
+ private static DataSegment createNewSegment2(String datasource)
{
return createSegment(
- newDataSource,
+ datasource,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
}
@@ -511,188 +506,171 @@ public class SqlSegmentsMetadataManagerTest
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment = createSegment(
- newDataSource,
+ final DataSegment koalaSegment = createSegment(
+ DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
-
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publisher.publishSegment(newSegment);
- awaitDataSourceAppeared(newDataSource);
-
Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+ publisher.publishSegment(koalaSegment);
+ awaitDataSourceAppeared(DS.KOALA);
+
Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
-
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(newSegment.getId()));
- awaitDataSourceDisappeared(newDataSource);
-
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
+
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(koalaSegment.getId()));
+ awaitDataSourceDisappeared(DS.KOALA);
+
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA));
}
- private void awaitDataSourceAppeared(String newDataSource) throws
InterruptedException
+ private void awaitDataSourceAppeared(String datasource) throws
InterruptedException
{
- while
(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource)
== null) {
- Thread.sleep(1000);
+ while
(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(datasource)
== null) {
+ Thread.sleep(5);
}
}
private void awaitDataSourceDisappeared(String dataSource) throws
InterruptedException
{
while
(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(dataSource)
!= null) {
- Thread.sleep(1000);
+ Thread.sleep(5);
}
}
@Test
public void testMarkAsUsedNonOvershadowedSegments() throws Exception
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment1 = createSegment(
- newDataSource,
+ final DataSegment koalaSegment1 = createSegment(
+ DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- final DataSegment newSegment2 = createSegment(
- newDataSource,
+ final DataSegment koalaSegment2 = createSegment(
+ DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
- "2017-10-16T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 1
+ "2017-10-16T20:19:12.565Z"
);
- // Overshadowed by newSegment2
- final DataSegment newSegment3 = createSegment(
- newDataSource,
+ // Overshadowed by koalaSegment2
+ final DataSegment koalaSegment3 = createSegment(
+ DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 1
+ "2017-10-15T20:19:12.565Z"
);
- publish(newSegment1, false);
- publish(newSegment2, false);
- publish(newSegment3, false);
- final ImmutableSet<String> segmentIds = ImmutableSet.of(
- newSegment1.getId().toString(),
- newSegment2.getId().toString(),
- newSegment3.getId().toString()
+ publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3);
+ final Set<String> segmentIds = ImmutableSet.of(
+ koalaSegment1.getId().toString(),
+ koalaSegment2.getId().toString(),
+ koalaSegment3.getId().toString()
);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
- Assert.assertEquals(2,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(newDataSource,
segmentIds));
+ Assert.assertEquals(2,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(DS.KOALA,
segmentIds));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2, newSegment1, newSegment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1,
koalaSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
- @Test(expected = UnknownSegmentIdsException.class)
+ @Test
public void testMarkAsUsedNonOvershadowedSegmentsInvalidDataSource() throws
Exception
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment1 = createNewSegment1(newDataSource);
-
- final DataSegment newSegment2 = createNewSegment1(newDataSource);
+ final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+ final DataSegment koalaSegment2 = createNewSegment1(DS.KOALA);
- publish(newSegment1, false);
- publish(newSegment2, false);
+ publishUnusedSegments(koalaSegment1, koalaSegment2);
final ImmutableSet<String> segmentIds =
- ImmutableSet.of(newSegment1.getId().toString(),
newSegment2.getId().toString());
+ ImmutableSet.of(koalaSegment1.getId().toString(),
koalaSegment2.getId().toString());
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
- // none of the segments are in data source
- Assert.assertEquals(0,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments("wrongDataSource",
segmentIds));
+
+ Assert.assertThrows(
+ UnknownSegmentIdsException.class,
+ () ->
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments("wrongDataSource",
segmentIds)
+ );
}
- @Test(expected = UnknownSegmentIdsException.class)
- public void testMarkAsUsedNonOvershadowedSegmentsWithInvalidSegmentIds()
throws UnknownSegmentIdsException
+ @Test
+ public void testMarkAsUsedNonOvershadowedSegmentsWithInvalidSegmentIds()
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment1 = createNewSegment1(newDataSource);
-
- final DataSegment newSegment2 = createNewSegment1(newDataSource);
+ final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+ final DataSegment koalaSegment2 = createNewSegment1(DS.KOALA);
final ImmutableSet<String> segmentIds =
- ImmutableSet.of(newSegment1.getId().toString(),
newSegment2.getId().toString());
+ ImmutableSet.of(koalaSegment1.getId().toString(),
koalaSegment2.getId().toString());
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
- // none of the segments are in data source
- Assert.assertEquals(0,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(newDataSource,
segmentIds));
+
+ Assert.assertThrows(
+ UnknownSegmentIdsException.class,
+ () ->
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(DS.KOALA,
segmentIds)
+ );
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsInInterval() throws
IOException
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment1 = createNewSegment1(newDataSource);
-
- final DataSegment newSegment2 = createSegment(
- newDataSource,
+ final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+ final DataSegment koalaSegment2 = createSegment(
+ DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
- "2017-10-16T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 1
+ "2017-10-16T20:19:12.565Z"
);
-
- final DataSegment newSegment3 = createSegment(
- newDataSource,
+ final DataSegment koalaSegment3 = createSegment(
+ DS.KOALA,
"2017-10-19T00:00:00.000/2017-10-20T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- // Overshadowed by newSegment2
- final DataSegment newSegment4 = createNewSegment2(newDataSource);
+ // Overshadowed by koalaSegment2
+ final DataSegment koalaSegment4 = createNewSegment2(DS.KOALA);
- publish(newSegment1, false);
- publish(newSegment2, false);
- publish(newSegment3, false);
- publish(newSegment4, false);
+ publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3,
koalaSegment4);
final Interval theInterval =
Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000");
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
// 2 out of 3 segments match the interval
- Assert.assertEquals(2,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(newDataSource,
theInterval));
+ Assert.assertEquals(2,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA,
theInterval));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2, newSegment1, newSegment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1,
koalaSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@@ -700,56 +678,47 @@ public class SqlSegmentsMetadataManagerTest
@Test
public void
testMarkAsUsedNonOvershadowedSegmentsInIntervalWithOverlappingInterval() throws
IOException
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment1 = createSegment(
- newDataSource,
+ final DataSegment koalaSegment1 = createSegment(
+ DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- final DataSegment newSegment2 = createSegment(
- newDataSource,
+ final DataSegment koalaSegment2 = createSegment(
+ DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
- "2017-10-16T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 1
+ "2017-10-16T20:19:12.565Z"
);
- final DataSegment newSegment3 = createSegment(
- newDataSource,
+ final DataSegment koalaSegment3 = createSegment(
+ DS.KOALA,
"2017-10-19T00:00:00.000/2017-10-22T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- // Overshadowed by newSegment2
- final DataSegment newSegment4 = createNewSegment2(newDataSource);
+ // Overshadowed by koalaSegment2
+ final DataSegment koalaSegment4 = createNewSegment2(DS.KOALA);
- publish(newSegment1, false);
- publish(newSegment2, false);
- publish(newSegment3, false);
- publish(newSegment4, false);
+ publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3,
koalaSegment4);
final Interval theInterval =
Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000");
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
// 1 out of 3 segments match the interval, other 2 overlap, only the
segment fully contained will be marked unused
- Assert.assertEquals(1,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(newDataSource,
theInterval));
+ Assert.assertEquals(1,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA,
theInterval));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2, newSegment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@@ -757,24 +726,24 @@ public class SqlSegmentsMetadataManagerTest
@Test
public void testMarkSegmentsAsUnused() throws IOException
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment1 = createNewSegment1(newDataSource);
+ final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+ final DataSegment koalaSegment2 = createNewSegment1(DS.KOALA);
- final DataSegment newSegment2 = createNewSegment1(newDataSource);
+ publisher.publishSegment(koalaSegment1);
+ publisher.publishSegment(koalaSegment2);
- publisher.publishSegment(newSegment1);
- publisher.publishSegment(newSegment2);
final ImmutableSet<SegmentId> segmentIds =
- ImmutableSet.of(newSegment1.getId(), newSegment1.getId());
+ ImmutableSet.of(koalaSegment1.getId(), koalaSegment1.getId());
Assert.assertEquals(segmentIds.size(),
sqlSegmentsMetadataManager.markSegmentsAsUnused(segmentIds));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2),
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@@ -782,34 +751,30 @@ public class SqlSegmentsMetadataManagerTest
@Test
public void testMarkAsUnusedSegmentsInInterval() throws IOException
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment1 = createNewSegment1(newDataSource);
-
- final DataSegment newSegment2 = createNewSegment2(newDataSource);
-
- final DataSegment newSegment3 = createSegment(
- newDataSource,
+ final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA);
+ final DataSegment koalaSegment2 = createNewSegment2(DS.KOALA);
+ final DataSegment koalaSegment3 = createSegment(
+ DS.KOALA,
"2017-10-19T00:00:00.000/2017-10-20T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publisher.publishSegment(newSegment1);
- publisher.publishSegment(newSegment2);
- publisher.publishSegment(newSegment3);
+ publisher.publishSegment(koalaSegment1);
+ publisher.publishSegment(koalaSegment2);
+ publisher.publishSegment(koalaSegment3);
final Interval theInterval =
Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000");
// 2 out of 3 segments match the interval
- Assert.assertEquals(2,
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(newDataSource,
theInterval));
+ Assert.assertEquals(2,
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA,
theInterval));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2, newSegment3),
+ ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment3),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@@ -817,40 +782,34 @@ public class SqlSegmentsMetadataManagerTest
@Test
public void testMarkAsUnusedSegmentsInIntervalWithOverlappingInterval()
throws IOException
{
+ publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
- final String newDataSource = "wikipedia2";
- final DataSegment newSegment1 = createSegment(
- newDataSource,
+ final DataSegment koalaSegment1 = createSegment(
+ DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
-
- final DataSegment newSegment2 = createNewSegment2(newDataSource);
-
- final DataSegment newSegment3 = createSegment(
- newDataSource,
+ final DataSegment koalaSegment2 = createNewSegment2(DS.KOALA);
+ final DataSegment koalaSegment3 = createSegment(
+ DS.KOALA,
"2017-10-19T00:00:00.000/2017-10-22T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publisher.publishSegment(newSegment1);
- publisher.publishSegment(newSegment2);
- publisher.publishSegment(newSegment3);
+ publisher.publishSegment(koalaSegment1);
+ publisher.publishSegment(koalaSegment2);
+ publisher.publishSegment(koalaSegment3);
final Interval theInterval =
Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000");
// 1 out of 3 segments match the interval, other 2 overlap, only the
segment fully contained will be marked unused
- Assert.assertEquals(1,
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(newDataSource,
theInterval));
+ Assert.assertEquals(1,
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA,
theInterval));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
- ImmutableSet.of(segment1, segment2, newSegment1, newSegment3),
+ ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1,
koalaSegment3),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@@ -868,81 +827,133 @@ public class SqlSegmentsMetadataManagerTest
@Test
public void testIterateAllUsedNonOvershadowedSegmentsForDatasourceInterval()
throws Exception
{
+ publishWikiSegments();
final Interval theInterval =
Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000");
- Optional<Iterable<DataSegment>> segments =
sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
- "wikipedia", theInterval, true
+
+ // Re-create SqlSegmentsMetadataManager with a higher poll duration
+ final SegmentsMetadataManagerConfig config = new
SegmentsMetadataManagerConfig();
+ config.setPollDuration(Period.seconds(1));
+ sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
+ JSON_MAPPER,
+ Suppliers.ofInstance(config),
+ derbyConnectorRule.metadataTablesConfigSupplier(),
+ derbyConnectorRule.getConnector()
);
+ sqlSegmentsMetadataManager.start();
+
+ Optional<Iterable<DataSegment>> segments = sqlSegmentsMetadataManager
+ .iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DS.WIKI,
theInterval, true);
Assert.assertTrue(segments.isPresent());
Set<DataSegment> dataSegmentSet = ImmutableSet.copyOf(segments.get());
Assert.assertEquals(1, dataSegmentSet.size());
- Assert.assertTrue(dataSegmentSet.contains(segment1));
+ Assert.assertTrue(dataSegmentSet.contains(wikiSegment1));
- final DataSegment newSegment2 = createSegment(
- "wikipedia",
+ final DataSegment wikiSegment3 = createSegment(
+ DS.WIKI,
"2012-03-16T00:00:00.000/2012-03-17T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
- "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publisher.publishSegment(newSegment2);
+ publisher.publishSegment(wikiSegment3);
// New segment is not returned since we call without force poll
- segments =
sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
- "wikipedia", theInterval, false
- );
+ segments = sqlSegmentsMetadataManager
+ .iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DS.WIKI,
theInterval, false);
Assert.assertTrue(segments.isPresent());
dataSegmentSet = ImmutableSet.copyOf(segments.get());
Assert.assertEquals(1, dataSegmentSet.size());
- Assert.assertTrue(dataSegmentSet.contains(segment1));
+ Assert.assertTrue(dataSegmentSet.contains(wikiSegment1));
// New segment is returned since we call with force poll
- segments =
sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
- "wikipedia", theInterval, true
- );
+ segments = sqlSegmentsMetadataManager
+ .iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DS.WIKI,
theInterval, true);
Assert.assertTrue(segments.isPresent());
dataSegmentSet = ImmutableSet.copyOf(segments.get());
Assert.assertEquals(2, dataSegmentSet.size());
- Assert.assertTrue(dataSegmentSet.contains(segment1));
- Assert.assertTrue(dataSegmentSet.contains(newSegment2));
+ Assert.assertTrue(dataSegmentSet.contains(wikiSegment1));
+ Assert.assertTrue(dataSegmentSet.contains(wikiSegment3));
}
@Test
public void testPopulateUsedFlagLastUpdated() throws IOException
{
- derbyConnectorRule.allowUsedFlagLastUpdatedToBeNullable();
- final DataSegment newSegment = createSegment(
- "dummyDS",
+ allowUsedFlagLastUpdatedToBeNullable();
+ final DataSegment koalaSegment = createSegment(
+ DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
- "2017-10-15T20:19:12.565Z",
-
"wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
- 0
+ "2017-10-15T20:19:12.565Z"
);
- publish(newSegment, false, null);
- Assert.assertTrue(getCountOfRowsWithLastUsedNull() > 0);
+
+ publishUnusedSegments(koalaSegment);
+ updateUsedStatusLastUpdatedToNull(koalaSegment);
+
+ Assert.assertEquals(1, getCountOfRowsWithLastUsedNull());
sqlSegmentsMetadataManager.populateUsedFlagLastUpdated();
- Assert.assertTrue(getCountOfRowsWithLastUsedNull() == 0);
+ Assert.assertEquals(0, getCountOfRowsWithLastUsedNull());
+ }
+
+ private void updateSegmentPayload(DataSegment segment, byte[] payload)
+ {
+ executeUpdate(
+ "UPDATE %1$s SET PAYLOAD = ? WHERE ID = ?",
+ payload,
+ segment.getId().toString()
+ );
}
private int getCountOfRowsWithLastUsedNull()
{
return derbyConnectorRule.getConnector().retryWithHandle(
- new HandleCallback<Integer>()
- {
- @Override
- public Integer withHandle(Handle handle)
- {
- List<Map<String, Object>> lst = handle.select(
- StringUtils.format(
- "SELECT * FROM %1$s WHERE USED_STATUS_LAST_UPDATED IS
NULL",
- derbyConnectorRule.metadataTablesConfigSupplier()
- .get()
- .getSegmentsTable()
- .toUpperCase(Locale.ENGLISH)
- )
- );
- return lst.size();
- }
- }
+ handle -> handle.select(
+ StringUtils.format(
+ "SELECT ID FROM %1$s WHERE USED_STATUS_LAST_UPDATED IS NULL",
+ getSegmentsTable()
+ )
+ ).size()
);
}
+
+ private void updateUsedStatusLastUpdated(DataSegment segment, DateTime
newValue)
+ {
+ executeUpdate(
+ "UPDATE %1$s SET USED_STATUS_LAST_UPDATED = ? WHERE ID = ?",
+ newValue.toString(),
+ segment.getId().toString()
+ );
+ }
+
+ private void updateUsedStatusLastUpdatedToNull(DataSegment segment)
+ {
+ executeUpdate(
+ "UPDATE %1$s SET USED_STATUS_LAST_UPDATED = NULL WHERE ID = ?",
+ segment.getId().toString()
+ );
+ }
+
+ private void executeUpdate(String sqlFormat, Object... args)
+ {
+ derbyConnectorRule.getConnector().retryWithHandle(
+ handle -> handle.update(
+ StringUtils.format(sqlFormat, getSegmentsTable()),
+ args
+ )
+ );
+ }
+
+ /**
+ * Alters the column used_status_last_updated to be nullable. This is used to
+ * test backward compatibility with versions of Druid without this column
+ * present in the segments table.
+ */
+ private void allowUsedFlagLastUpdatedToBeNullable()
+ {
+ executeUpdate("ALTER TABLE %1$s ALTER COLUMN USED_STATUS_LAST_UPDATED
NULL");
+ }
+
+ private String getSegmentsTable()
+ {
+ return derbyConnectorRule.metadataTablesConfigSupplier()
+ .get()
+ .getSegmentsTable()
+ .toUpperCase(Locale.ENGLISH);
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java
b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java
index d0d8357837c..e5460ce402b 100644
--- a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java
+++ b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java
@@ -25,14 +25,10 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.storage.derby.DerbyConnector;
import org.junit.Assert;
import org.junit.rules.ExternalResource;
-import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.DBI;
-import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
-import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
-import java.util.Locale;
import java.util.UUID;
public class TestDerbyConnector extends DerbyConnector
@@ -139,27 +135,5 @@ public class TestDerbyConnector extends DerbyConnector
{
return dbTables;
}
-
- public void allowUsedFlagLastUpdatedToBeNullable()
- {
- connector.retryWithHandle(
- new HandleCallback<Void>()
- {
- @Override
- public Void withHandle(Handle handle)
- {
- final Batch batch = handle.createBatch();
- batch.add(
- StringUtils.format(
- "ALTER TABLE %1$s ALTER COLUMN USED_STATUS_LAST_UPDATED
NULL",
-
dbTables.get().getSegmentsTable().toUpperCase(Locale.ENGLISH)
- )
- );
- batch.execute();
- return null;
- }
- }
- );
- }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]