gianm commented on a change in pull request #7653: Refactor
SQLMetadataSegmentManager; Change contract of REST methods in
DataSourcesResource
URL: https://github.com/apache/incubator-druid/pull/7653#discussion_r286368954
##########
File path:
server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
##########
@@ -197,366 +320,489 @@ private Runnable createPollTaskForStartOrder(long
startOrder)
}
@Override
- @LifecycleStop
- public void stop()
+ public void stopPollingDatabasePeriodically()
{
- ReentrantReadWriteLock.WriteLock lock = startStopLock.writeLock();
+ ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
lock.lock();
try {
- if (!isStarted()) {
+ if (!isPollingDatabasePeriodically()) {
return;
}
- dataSources = null;
- currentStartOrder = -1;
- exec.shutdownNow();
- exec = null;
+ periodicPollTaskFuture.cancel(false);
+ latestDatabasePoll = null;
+
+ // NOT nulling dataSources, allowing to query the latest polled data
even when this SegmentsMetadata object is
+ // stopped.
+
+ currentStartPollingOrder = -1;
}
finally {
lock.unlock();
}
}
- private Pair<DataSegment, Boolean> usedPayloadMapper(
- final int index,
- final ResultSet resultSet,
- final StatementContext context
- ) throws SQLException
+ private void awaitOrPerformDatabasePoll()
{
+ // Double-checked locking with awaitPeriodicOrFreshOnDemandDatabasePoll()
call playing the role of the "check".
+ if (awaitPeriodicOrFreshOnDemandDatabasePoll()) {
+ return;
+ }
+ ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+ lock.lock();
try {
- return new Pair<>(
- jsonMapper.readValue(resultSet.getBytes("payload"),
DataSegment.class),
- resultSet.getBoolean("used")
- );
+ if (awaitPeriodicOrFreshOnDemandDatabasePoll()) {
+ return;
+ }
+ OnDemandDatabasePoll newOnDemandUpdate = new OnDemandDatabasePoll();
+ this.latestDatabasePoll = newOnDemandUpdate;
+ doOnDemandPoll(newOnDemandUpdate);
}
- catch (IOException e) {
- throw new RuntimeException(e);
+ finally {
+ lock.unlock();
}
}
- /**
- * Gets a list of all datasegments that overlap the provided interval along
with thier used status.
- */
- private List<Pair<DataSegment, Boolean>> getDataSegmentsOverlappingInterval(
- final String dataSource,
- final Interval interval
- )
- {
- return connector.inReadOnlyTransaction(
- (handle, status) -> handle.createQuery(
- StringUtils.format(
- "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource
AND start < :end AND %2$send%2$s > :start",
- getSegmentsTable(),
- connector.getQuoteString()
- )
- )
- .setFetchSize(connector.getStreamingFetchSize())
- .bind("dataSource", dataSource)
- .bind("start", interval.getStart().toString())
- .bind("end", interval.getEnd().toString())
- .map(this::usedPayloadMapper)
- .list()
- );
- }
-
- private List<Pair<DataSegment, Boolean>> getDataSegments(
- final String dataSource,
- final Collection<String> segmentIds,
- final Handle handle
- )
+ private boolean awaitPeriodicOrFreshOnDemandDatabasePoll()
{
- return segmentIds.stream().map(
- segmentId -> Optional.ofNullable(
- handle.createQuery(
- StringUtils.format(
- "SELECT used, payload FROM %1$s WHERE dataSource =
:dataSource AND id = :id",
- getSegmentsTable()
- )
- )
- .bind("dataSource", dataSource)
- .bind("id", segmentId)
- .map(this::usedPayloadMapper)
- .first()
- )
- .orElseThrow(() -> new
UnknownSegmentIdException(StringUtils.format("Cannot find segment id [%s]",
segmentId)))
- )
- .collect(Collectors.toList());
+ DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
Review comment:
It looks like the idea is that if we are in the midst of a leadership epoch,
then:
- `isPollingDatabasePeriodically` would return true
- `latestDatabasePoll` will be a PeriodicDatabasePoll
- we are guaranteed to await for that PeriodicDatabasePoll to complete its
first poll, if `awaitPeriodicOrFreshOnDemandDatabasePoll` is called after the
leadership epoch starts
Is that right? If so it should provide the property I mentioned in another
comment (preventing leadership epoch N from using an older snapshot than
leadership epoch N-1). If so IMO it'd be nice to have some comments or
assertions about these class invariants.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]