surekhasaharan commented on a change in pull request #7653: Refactor
SQLMetadataSegmentManager; Change contract of REST methods in
DataSourcesResource
URL: https://github.com/apache/incubator-druid/pull/7653#discussion_r287892431
##########
File path:
server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
##########
@@ -197,366 +340,511 @@ private Runnable createPollTaskForStartOrder(long
startOrder)
}
@Override
- @LifecycleStop
- public void stop()
+ public boolean isPollingDatabasePeriodically()
{
- ReentrantReadWriteLock.WriteLock lock = startStopLock.writeLock();
+ // isPollingDatabasePeriodically() is synchronized together with
startPollingDatabasePeriodically(),
+ // stopPollingDatabasePeriodically() and poll() to ensure that the latest
currentStartPollingOrder is always
+ // visible. readLock should be used to avoid unexpected performance
degradation of DruidCoordinator.
+ ReentrantReadWriteLock.ReadLock lock = startStopPollLock.readLock();
lock.lock();
try {
- if (!isStarted()) {
+ return currentStartPollingOrder >= 0;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void stopPollingDatabasePeriodically()
+ {
+ ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+ lock.lock();
+ try {
+ if (!isPollingDatabasePeriodically()) {
return;
}
- dataSources = null;
- currentStartOrder = -1;
- exec.shutdownNow();
- exec = null;
+ periodicPollTaskFuture.cancel(false);
+ latestDatabasePoll = null;
+
+ // NOT nulling dataSources, allowing to query the latest polled data
even when this SegmentsMetadata object is
+ // stopped.
+
+ currentStartPollingOrder = -1;
}
finally {
lock.unlock();
}
}
- private Pair<DataSegment, Boolean> usedPayloadMapper(
- final int index,
- final ResultSet resultSet,
- final StatementContext context
- ) throws SQLException
+ private void awaitOrPerformDatabasePoll()
{
+ // Double-checked locking with awaitLatestDatabasePoll() call playing the
role of the "check".
+ if (awaitLatestDatabasePoll()) {
+ return;
+ }
+ ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+ lock.lock();
try {
- return new Pair<>(
- jsonMapper.readValue(resultSet.getBytes("payload"),
DataSegment.class),
- resultSet.getBoolean("used")
- );
+ if (awaitLatestDatabasePoll()) {
+ return;
+ }
+ OnDemandDatabasePoll newOnDemandUpdate = new OnDemandDatabasePoll();
+ this.latestDatabasePoll = newOnDemandUpdate;
+ doOnDemandPoll(newOnDemandUpdate);
}
- catch (IOException e) {
- throw new RuntimeException(e);
+ finally {
+ lock.unlock();
}
}
/**
- * Gets a list of all datasegments that overlap the provided interval along
with thier used status.
+ * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or
an {@link OnDemandDatabasePoll} that is
+ * made not longer than {@link #periodicPollDelay} from now, awaits for it
and returns true; returns false otherwise,
+ * meaning that a new on-demand database poll should be initiated.
*/
- private List<Pair<DataSegment, Boolean>> getDataSegmentsOverlappingInterval(
- final String dataSource,
- final Interval interval
- )
+ private boolean awaitLatestDatabasePoll()
{
- return connector.inReadOnlyTransaction(
- (handle, status) -> handle.createQuery(
- StringUtils.format(
- "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource
AND start < :end AND %2$send%2$s > :start",
- getSegmentsTable(),
- connector.getQuoteString()
- )
- )
- .setFetchSize(connector.getStreamingFetchSize())
- .bind("dataSource", dataSource)
- .bind("start", interval.getStart().toString())
- .bind("end", interval.getEnd().toString())
- .map(this::usedPayloadMapper)
- .list()
- );
+ DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
+ if (latestDatabasePoll instanceof PeriodicDatabasePoll) {
+ Futures.getUnchecked(((PeriodicDatabasePoll)
latestDatabasePoll).firstPollCompletionFuture);
+ return true;
+ }
+ if (latestDatabasePoll instanceof OnDemandDatabasePoll) {
+ long periodicPollDelayNanos =
TimeUnit.MILLISECONDS.toNanos(periodicPollDelay.getMillis());
+ OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll)
latestDatabasePoll;
+ boolean latestUpdateIsFresh =
latestOnDemandPoll.nanosElapsedFromInitiation() < periodicPollDelayNanos;
+ if (latestUpdateIsFresh) {
+ Futures.getUnchecked(latestOnDemandPoll.pollCompletionFuture);
+ return true;
+ }
+ // Latest on-demand update is not fresh. Fall through to return false
from this method.
+ } else {
+ assert latestDatabasePoll == null;
+ // No periodic updates and no on-demand database poll have been done
yet, nothing to await for.
+ }
+ return false;
}
- private List<Pair<DataSegment, Boolean>> getDataSegments(
- final String dataSource,
- final Collection<String> segmentIds,
- final Handle handle
- )
+ private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll)
{
- return segmentIds.stream().map(
- segmentId -> Optional.ofNullable(
- handle.createQuery(
- StringUtils.format(
- "SELECT used, payload FROM %1$s WHERE dataSource =
:dataSource AND id = :id",
- getSegmentsTable()
- )
- )
- .bind("dataSource", dataSource)
- .bind("id", segmentId)
- .map(this::usedPayloadMapper)
- .first()
- )
- .orElseThrow(() -> new
UnknownSegmentIdException(StringUtils.format("Cannot find segment id [%s]",
segmentId)))
- )
- .collect(Collectors.toList());
- }
-
- /**
- * Builds a VersionedIntervalTimeline containing used segments that overlap
the intervals passed.
- */
- private VersionedIntervalTimeline<String, DataSegment>
buildVersionedIntervalTimeline(
- final String dataSource,
- final Collection<Interval> intervals,
- final Handle handle
- )
- {
- return VersionedIntervalTimeline.forSegments(intervals
- .stream()
- .flatMap(interval -> handle.createQuery(
- StringUtils.format(
- "SELECT payload FROM %1$s WHERE dataSource = :dataSource
AND start < :end AND %2$send%2$s > :start AND used = true",
- getSegmentsTable(),
- connector.getQuoteString()
- )
- )
- .setFetchSize(connector.getStreamingFetchSize())
- .bind("dataSource", dataSource)
- .bind("start", interval.getStart().toString())
- .bind("end", interval.getEnd().toString())
- .map((i, resultSet, context) -> {
- try {
- return jsonMapper.readValue(resultSet.getBytes("payload"),
DataSegment.class);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .list()
- .stream()
- )
- .iterator()
- );
+ try {
+ poll();
+ onDemandPoll.pollCompletionFuture.complete(null);
+ }
+ catch (Throwable t) {
+ onDemandPoll.pollCompletionFuture.completeExceptionally(t);
+ throw t;
+ }
}
@Override
- public boolean enableDataSource(final String dataSource)
+ public boolean markSegmentAsUsed(final String segmentId)
{
try {
- return enableSegments(dataSource, Intervals.ETERNITY) != 0;
+ int numUpdatedDatabaseEntries = connector.getDBI().withHandle(
+ (Handle handle) -> handle
+ .createStatement(StringUtils.format("UPDATE %s SET used=true
WHERE id = :id", getSegmentsTable()))
+ .bind("id", segmentId)
+ .execute()
+ );
+ // Unlike bulk markAsUsed methods:
markAsUsedAllNonOvershadowedSegmentsInDataSource(),
+ // markAsUsedNonOvershadowedSegmentsInInterval(), and
markAsUsedNonOvershadowedSegments() we don't put the marked
+ // segment into the respective data source, because we don't have it
fetched from the database. It's probably not
+ // worth complicating the implementation and making two database queries
just to add the segment because it will
+ // be anyway fetched during the next poll(). Segment putting that is
done in the bulk markAsUsed methods is a nice
+ // to have thing, but doesn't formally affects the external guarantees
of SegmentsMetadata class.
+ return numUpdatedDatabaseEntries > 0;
}
- catch (Exception e) {
- log.error(e, "Exception enabling datasource %s", dataSource);
- return false;
+ catch (RuntimeException e) {
+ log.error(e, "Exception marking segment %s as used", segmentId);
+ throw e;
}
}
@Override
- public int enableSegments(final String dataSource, final Interval interval)
+ public int markAsUsedAllNonOvershadowedSegmentsInDataSource(final String
dataSource)
{
- List<Pair<DataSegment, Boolean>> segments =
getDataSegmentsOverlappingInterval(dataSource, interval);
- List<DataSegment> segmentsToEnable = segments.stream()
- .filter(segment -> !segment.rhs &&
interval.contains(segment.lhs.getInterval()))
- .map(segment -> segment.lhs)
- .collect(Collectors.toList());
-
- VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline =
VersionedIntervalTimeline.forSegments(
- segments.stream().filter(segment -> segment.rhs).map(segment ->
segment.lhs).iterator()
- );
- VersionedIntervalTimeline.addSegments(versionedIntervalTimeline,
segmentsToEnable.iterator());
-
- return enableSegments(
- segmentsToEnable,
- versionedIntervalTimeline
- );
+ return doMarkAsUsedNonOvershadowedSegments(dataSource, null);
}
@Override
- public int enableSegments(final String dataSource, final Collection<String>
segmentIds)
+ public int markAsUsedNonOvershadowedSegmentsInInterval(final String
dataSource, final Interval interval)
{
- Pair<List<DataSegment>, VersionedIntervalTimeline<String, DataSegment>>
data = connector.inReadOnlyTransaction(
- (handle, status) -> {
- List<DataSegment> segments = getDataSegments(dataSource, segmentIds,
handle)
- .stream()
- .filter(pair -> !pair.rhs)
- .map(pair -> pair.lhs)
- .collect(Collectors.toList());
-
- VersionedIntervalTimeline<String, DataSegment>
versionedIntervalTimeline = buildVersionedIntervalTimeline(
- dataSource,
- JodaUtils.condenseIntervals(segments.stream().map(segment ->
segment.getInterval()).collect(Collectors.toList())),
- handle
- );
- VersionedIntervalTimeline.addSegments(versionedIntervalTimeline,
segments.iterator());
+ Preconditions.checkNotNull(interval);
+ return doMarkAsUsedNonOvershadowedSegments(dataSource, interval);
+ }
- return new Pair<>(
- segments,
- versionedIntervalTimeline
- );
+ /**
+ * Implementation for both {@link
#markAsUsedAllNonOvershadowedSegmentsInDataSource} (if the given interval is
null)
+ * and {@link #markAsUsedNonOvershadowedSegmentsInInterval}.
+ */
+ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName,
@Nullable Interval interval)
+ {
+ List<DataSegment> usedSegmentsOverlappingInterval = new ArrayList<>();
+ List<DataSegment> unusedSegmentsInInterval = new ArrayList<>();
+ connector.inReadOnlyTransaction(
+ (handle, status) -> {
+ String queryString =
+ StringUtils.format("SELECT used, payload FROM %1$s WHERE
dataSource = :dataSource", getSegmentsTable());
+ if (interval != null) {
+ queryString += StringUtils.format(" AND start < :end AND
%1$send%1$s > :start", connector.getQuoteString());
+ }
+ Query<?> query = handle
+ .createQuery(queryString)
+ .setFetchSize(connector.getStreamingFetchSize())
+ .bind("dataSource", dataSourceName);
+ if (interval != null) {
+ query = query
+ .bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString());
+ }
+ query = query
+ .map((int index, ResultSet resultSet, StatementContext context)
-> {
+ try {
+ DataSegment segment =
jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
+ if (resultSet.getBoolean("used")) {
+ usedSegmentsOverlappingInterval.add(segment);
+ } else {
+ if (interval == null ||
interval.contains(segment.getInterval())) {
+ unusedSegmentsInInterval.add(segment);
+ }
+ }
+ return null;
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ // Consume the query results to ensure
usedSegmentsOverlappingInterval and unusedSegmentsInInterval are
+ // populated.
+ consume(query.iterator());
+ return null;
}
);
- return enableSegments(
- data.lhs,
- data.rhs
+ VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline =
VersionedIntervalTimeline.forSegments(
+ Iterators.concat(usedSegmentsOverlappingInterval.iterator(),
unusedSegmentsInInterval.iterator())
);
+
+ return markNonOvershadowedSegmentsAsUsed(dataSourceName,
unusedSegmentsInInterval, versionedIntervalTimeline);
}
- private int enableSegments(
- final Collection<DataSegment> segments,
- final VersionedIntervalTimeline<String, DataSegment>
versionedIntervalTimeline
+ private static void consume(Iterator<?> iterator)
+ {
+ while (iterator.hasNext()) {
+ iterator.next();
+ }
+ }
+
+ /** Also puts non-overshadowed segments into {@link #dataSources}. */
+ private int markNonOvershadowedSegmentsAsUsed(
+ String dataSourceName,
+ List<DataSegment> unusedSegments,
+ VersionedIntervalTimeline<String, DataSegment> timeline
)
{
- if (segments.isEmpty()) {
- log.warn("No segments found to update!");
- return 0;
+ @Nullable
+ DruidDataSource dataSource = null;
+ if (dataSources != null) {
+ dataSource = dataSources.computeIfAbsent(
+ dataSourceName,
+ dsName -> new DruidDataSource(dsName,
createDefaultDataSourceProperties())
+ );
+ }
+ List<String> segmentIdsToMarkAsUsed = new ArrayList<>();
+ for (DataSegment segment : unusedSegments) {
+ if (timeline.isOvershadowed(segment.getInterval(),
segment.getVersion())) {
+ continue;
+ }
+ if (dataSource != null) {
+ dataSource.addSegment(segment);
+ }
+ String s = segment.getId().toString();
+ segmentIdsToMarkAsUsed.add(s);
Review comment:
nit : rename s to segmentId or get rid of `s` and do it inline
`segmentIdsToMarkAsUsed.add(segment.getId().toString())`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]