leventov commented on a change in pull request #7653: Refactor
SQLMetadataSegmentManager; Change contract of REST methods in
DataSourcesResource
URL: https://github.com/apache/incubator-druid/pull/7653#discussion_r287358254
##########
File path:
server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
##########
@@ -197,366 +320,489 @@ private Runnable createPollTaskForStartOrder(long
startOrder)
}
@Override
- @LifecycleStop
- public void stop()
+ public void stopPollingDatabasePeriodically()
{
- ReentrantReadWriteLock.WriteLock lock = startStopLock.writeLock();
+ ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
lock.lock();
try {
- if (!isStarted()) {
+ if (!isPollingDatabasePeriodically()) {
return;
}
- dataSources = null;
- currentStartOrder = -1;
- exec.shutdownNow();
- exec = null;
+ periodicPollTaskFuture.cancel(false);
+ latestDatabasePoll = null;
+
+ // NOT nulling dataSources, allowing to query the latest polled data
even when this SegmentsMetadata object is
+ // stopped.
+
+ currentStartPollingOrder = -1;
}
finally {
lock.unlock();
}
}
- private Pair<DataSegment, Boolean> usedPayloadMapper(
- final int index,
- final ResultSet resultSet,
- final StatementContext context
- ) throws SQLException
+ private void awaitOrPerformDatabasePoll()
{
+ // Double-checked locking with awaitPeriodicOrFreshOnDemandDatabasePoll()
call playing the role of the "check".
+ if (awaitPeriodicOrFreshOnDemandDatabasePoll()) {
+ return;
+ }
+ ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+ lock.lock();
try {
- return new Pair<>(
- jsonMapper.readValue(resultSet.getBytes("payload"),
DataSegment.class),
- resultSet.getBoolean("used")
- );
+ if (awaitPeriodicOrFreshOnDemandDatabasePoll()) {
+ return;
+ }
+ OnDemandDatabasePoll newOnDemandUpdate = new OnDemandDatabasePoll();
+ this.latestDatabasePoll = newOnDemandUpdate;
+ doOnDemandPoll(newOnDemandUpdate);
}
- catch (IOException e) {
- throw new RuntimeException(e);
+ finally {
+ lock.unlock();
}
}
- /**
- * Gets a list of all datasegments that overlap the provided interval along
with thier used status.
- */
- private List<Pair<DataSegment, Boolean>> getDataSegmentsOverlappingInterval(
- final String dataSource,
- final Interval interval
- )
- {
- return connector.inReadOnlyTransaction(
- (handle, status) -> handle.createQuery(
- StringUtils.format(
- "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource
AND start < :end AND %2$send%2$s > :start",
- getSegmentsTable(),
- connector.getQuoteString()
- )
- )
- .setFetchSize(connector.getStreamingFetchSize())
- .bind("dataSource", dataSource)
- .bind("start", interval.getStart().toString())
- .bind("end", interval.getEnd().toString())
- .map(this::usedPayloadMapper)
- .list()
- );
- }
-
- private List<Pair<DataSegment, Boolean>> getDataSegments(
- final String dataSource,
- final Collection<String> segmentIds,
- final Handle handle
- )
+ private boolean awaitPeriodicOrFreshOnDemandDatabasePoll()
Review comment:
Renamed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]