leventov commented on a change in pull request #7653: Refactor
SQLMetadataSegmentManager; Change contract of REST methods in
DataSourcesResource
URL: https://github.com/apache/incubator-druid/pull/7653#discussion_r287358309
##########
File path:
server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
##########
@@ -197,366 +320,489 @@ private Runnable createPollTaskForStartOrder(long
startOrder)
}
@Override
- @LifecycleStop
- public void stop()
+ public void stopPollingDatabasePeriodically()
{
- ReentrantReadWriteLock.WriteLock lock = startStopLock.writeLock();
+ ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
lock.lock();
try {
- if (!isStarted()) {
+ if (!isPollingDatabasePeriodically()) {
return;
}
- dataSources = null;
- currentStartOrder = -1;
- exec.shutdownNow();
- exec = null;
+ periodicPollTaskFuture.cancel(false);
+ latestDatabasePoll = null;
+
+ // NOT nulling dataSources, allowing to query the latest polled data
even when this SegmentsMetadata object is
+ // stopped.
+
+ currentStartPollingOrder = -1;
}
finally {
lock.unlock();
}
}
- private Pair<DataSegment, Boolean> usedPayloadMapper(
- final int index,
- final ResultSet resultSet,
- final StatementContext context
- ) throws SQLException
+ private void awaitOrPerformDatabasePoll()
{
+ // Double-checked locking with awaitPeriodicOrFreshOnDemandDatabasePoll()
call playing the role of the "check".
+ if (awaitPeriodicOrFreshOnDemandDatabasePoll()) {
+ return;
+ }
+ ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+ lock.lock();
try {
- return new Pair<>(
- jsonMapper.readValue(resultSet.getBytes("payload"),
DataSegment.class),
- resultSet.getBoolean("used")
- );
+ if (awaitPeriodicOrFreshOnDemandDatabasePoll()) {
+ return;
+ }
+ OnDemandDatabasePoll newOnDemandUpdate = new OnDemandDatabasePoll();
+ this.latestDatabasePoll = newOnDemandUpdate;
+ doOnDemandPoll(newOnDemandUpdate);
}
- catch (IOException e) {
- throw new RuntimeException(e);
+ finally {
+ lock.unlock();
}
}
- /**
- * Gets a list of all datasegments that overlap the provided interval along
with thier used status.
- */
- private List<Pair<DataSegment, Boolean>> getDataSegmentsOverlappingInterval(
- final String dataSource,
- final Interval interval
- )
- {
- return connector.inReadOnlyTransaction(
- (handle, status) -> handle.createQuery(
- StringUtils.format(
- "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource
AND start < :end AND %2$send%2$s > :start",
- getSegmentsTable(),
- connector.getQuoteString()
- )
- )
- .setFetchSize(connector.getStreamingFetchSize())
- .bind("dataSource", dataSource)
- .bind("start", interval.getStart().toString())
- .bind("end", interval.getEnd().toString())
- .map(this::usedPayloadMapper)
- .list()
- );
- }
-
- private List<Pair<DataSegment, Boolean>> getDataSegments(
- final String dataSource,
- final Collection<String> segmentIds,
- final Handle handle
- )
+ private boolean awaitPeriodicOrFreshOnDemandDatabasePoll()
{
- return segmentIds.stream().map(
- segmentId -> Optional.ofNullable(
- handle.createQuery(
- StringUtils.format(
- "SELECT used, payload FROM %1$s WHERE dataSource =
:dataSource AND id = :id",
- getSegmentsTable()
- )
- )
- .bind("dataSource", dataSource)
- .bind("id", segmentId)
- .map(this::usedPayloadMapper)
- .first()
- )
- .orElseThrow(() -> new
UnknownSegmentIdException(StringUtils.format("Cannot find segment id [%s]",
segmentId)))
- )
- .collect(Collectors.toList());
+ DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
+ if (latestDatabasePoll instanceof PeriodicDatabasePoll) {
+ Futures.getUnchecked(((PeriodicDatabasePoll)
latestDatabasePoll).firstPollCompletionFuture);
+ return true;
+ }
+ if (latestDatabasePoll instanceof OnDemandDatabasePoll) {
+ long periodicPollDelayNanos =
TimeUnit.MILLISECONDS.toNanos(periodicPollDelay.getMillis());
+ OnDemandDatabasePoll latestOnDemandUpdate = (OnDemandDatabasePoll)
latestDatabasePoll;
+ boolean latestUpdateIsFresh =
latestOnDemandUpdate.nanosElapsedFromInitiation() < periodicPollDelayNanos;
+ if (latestUpdateIsFresh) {
+ Futures.getUnchecked(latestOnDemandUpdate.pollCompletionFuture);
+ return true;
+ }
+ // Latest on-demand update is not fresh. Fall through to return false
from this method.
+ } else {
+ assert latestDatabasePoll == null;
+ }
+ return false;
}
- /**
- * Builds a VersionedIntervalTimeline containing used segments that overlap
the intervals passed.
- */
- private VersionedIntervalTimeline<String, DataSegment>
buildVersionedIntervalTimeline(
- final String dataSource,
- final Collection<Interval> intervals,
- final Handle handle
- )
+ private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll)
{
- return VersionedIntervalTimeline.forSegments(intervals
- .stream()
- .flatMap(interval -> handle.createQuery(
- StringUtils.format(
- "SELECT payload FROM %1$s WHERE dataSource = :dataSource
AND start < :end AND %2$send%2$s > :start AND used = true",
- getSegmentsTable(),
- connector.getQuoteString()
- )
- )
- .setFetchSize(connector.getStreamingFetchSize())
- .bind("dataSource", dataSource)
- .bind("start", interval.getStart().toString())
- .bind("end", interval.getEnd().toString())
- .map((i, resultSet, context) -> {
- try {
- return jsonMapper.readValue(resultSet.getBytes("payload"),
DataSegment.class);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .list()
- .stream()
- )
- .iterator()
- );
+ try {
+ poll();
+ onDemandPoll.pollCompletionFuture.complete(null);
+ }
+ catch (Throwable t) {
+ onDemandPoll.pollCompletionFuture.completeExceptionally(t);
+ throw t;
+ }
}
@Override
- public boolean enableDataSource(final String dataSource)
+ public boolean markSegmentAsUsed(final String segmentId)
{
try {
- return enableSegments(dataSource, Intervals.ETERNITY) != 0;
+ int numUpdatedDatabaseEntries = connector.getDBI().withHandle(
+ (Handle handle) -> handle
+ .createStatement(StringUtils.format("UPDATE %s SET used=true
WHERE id = :id", getSegmentsTable()))
+ .bind("id", segmentId)
+ .execute()
+ );
+ // Unlike bulk markAsUsed methods:
markAsUsedAllNonOvershadowedSegmentsInDataSource(),
+ // markAsUsedNonOvershadowedSegmentsInInterval(), and
markAsUsedNonOvershadowedSegments() we don't put the marked
+ // segment into the respective data source, because we don't have it
fetched from the database. It's probably not
+ // worth complicating the implementation and making two database queries
just to add the segment because it will
+ // be anyway fetched during the next poll(). Segment putting that is
done in the bulk markAsUsed methods is a nice
+ // to have thing, but doesn't formally affects the external guarantees
of SegmentsMetadata class.
+ return numUpdatedDatabaseEntries > 0;
}
- catch (Exception e) {
- log.error(e, "Exception enabling datasource %s", dataSource);
- return false;
+ catch (RuntimeException e) {
+ log.error(e, "Exception marking segment %s as used", segmentId);
+ throw e;
}
}
@Override
- public int enableSegments(final String dataSource, final Interval interval)
+ public int markAsUsedAllNonOvershadowedSegmentsInDataSource(final String
dataSource)
{
- List<Pair<DataSegment, Boolean>> segments =
getDataSegmentsOverlappingInterval(dataSource, interval);
- List<DataSegment> segmentsToEnable = segments.stream()
- .filter(segment -> !segment.rhs &&
interval.contains(segment.lhs.getInterval()))
- .map(segment -> segment.lhs)
- .collect(Collectors.toList());
-
- VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline =
VersionedIntervalTimeline.forSegments(
- segments.stream().filter(segment -> segment.rhs).map(segment ->
segment.lhs).iterator()
- );
- VersionedIntervalTimeline.addSegments(versionedIntervalTimeline,
segmentsToEnable.iterator());
-
- return enableSegments(
- segmentsToEnable,
- versionedIntervalTimeline
- );
+ return doMarkAsUsedNonOvershadowedSegments(dataSource, null);
}
@Override
- public int enableSegments(final String dataSource, final Collection<String>
segmentIds)
+ public int markAsUsedNonOvershadowedSegmentsInInterval(final String
dataSource, final Interval interval)
{
- Pair<List<DataSegment>, VersionedIntervalTimeline<String, DataSegment>>
data = connector.inReadOnlyTransaction(
- (handle, status) -> {
- List<DataSegment> segments = getDataSegments(dataSource, segmentIds,
handle)
- .stream()
- .filter(pair -> !pair.rhs)
- .map(pair -> pair.lhs)
- .collect(Collectors.toList());
-
- VersionedIntervalTimeline<String, DataSegment>
versionedIntervalTimeline = buildVersionedIntervalTimeline(
- dataSource,
- JodaUtils.condenseIntervals(segments.stream().map(segment ->
segment.getInterval()).collect(Collectors.toList())),
- handle
- );
- VersionedIntervalTimeline.addSegments(versionedIntervalTimeline,
segments.iterator());
+ Preconditions.checkNotNull(interval);
+ return doMarkAsUsedNonOvershadowedSegments(dataSource, interval);
+ }
- return new Pair<>(
- segments,
- versionedIntervalTimeline
- );
+ /**
+ * Implementation for both {@link
#markAsUsedAllNonOvershadowedSegmentsInDataSource} (if the given interval is
null)
+ * and {@link #markAsUsedNonOvershadowedSegmentsInInterval}.
+ */
+ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName,
@Nullable Interval interval)
+ {
+ List<DataSegment> usedSegmentsOverlappingInterval = new ArrayList<>();
+ List<DataSegment> unusedSegmentsInInterval = new ArrayList<>();
+ connector.inReadOnlyTransaction(
+ (handle, status) -> {
+ String queryString =
+ StringUtils.format("SELECT used, payload FROM %1$s WHERE
dataSource = :dataSource", getSegmentsTable());
+ if (interval != null) {
+ queryString += StringUtils.format(" AND start < :end AND
%1$send%1$s > :start", connector.getQuoteString());
+ }
+ Query<?> query = handle
+ .createQuery(queryString)
+ .setFetchSize(connector.getStreamingFetchSize())
+ .bind("dataSource", dataSourceName);
+ if (interval != null) {
+ query = query
+ .bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString());
+ }
+ query = query
+ .map((int index, ResultSet resultSet, StatementContext context)
-> {
+ try {
+ DataSegment segment =
jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
+ if (resultSet.getBoolean("used")) {
+ usedSegmentsOverlappingInterval.add(segment);
+ } else {
+ if (interval == null ||
interval.contains(segment.getInterval())) {
+ unusedSegmentsInInterval.add(segment);
+ }
+ }
+ return null;
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ // Consume the query results to ensure
usedSegmentsOverlappingInterval and unusedSegmentsInInterval are
+ // populated.
+ consume(query.iterator());
+ return null;
}
);
- return enableSegments(
- data.lhs,
- data.rhs
+ VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline =
VersionedIntervalTimeline.forSegments(
+ Iterators.concat(usedSegmentsOverlappingInterval.iterator(),
unusedSegmentsInInterval.iterator())
);
+
+ return markNonOvershadowedSegmentsAsUsed(dataSourceName,
unusedSegmentsInInterval, versionedIntervalTimeline);
+ }
+
+ private static void consume(Iterator<?> iterator)
+ {
+ while (iterator.hasNext()) {
+ iterator.next();
+ }
}
- private int enableSegments(
- final Collection<DataSegment> segments,
- final VersionedIntervalTimeline<String, DataSegment>
versionedIntervalTimeline
+ /** Also puts non-overshadowed segments into {@link #dataSources}. */
+ private int markNonOvershadowedSegmentsAsUsed(
+ String dataSourceName,
+ List<DataSegment> unusedSegments,
+ VersionedIntervalTimeline<String, DataSegment> timeline
)
{
- if (segments.isEmpty()) {
- log.warn("No segments found to update!");
- return 0;
+ @Nullable
+ DruidDataSource dataSource = null;
+ if (dataSources != null) {
+ dataSource = dataSources.computeIfAbsent(
+ dataSourceName,
+ dsName -> new DruidDataSource(dsName,
createDefaultDataSourceProperties())
+ );
+ }
+ List<String> segmentIdsToMarkAsUsed = new ArrayList<>();
+ for (DataSegment segment : unusedSegments) {
+ if (timeline.isOvershadowed(segment.getInterval(),
segment.getVersion())) {
+ continue;
+ }
+ if (dataSource != null) {
+ dataSource.addSegment(segment);
+ }
+ String s = segment.getId().toString();
+ segmentIdsToMarkAsUsed.add(s);
}
- return connector.getDBI().withHandle(handle -> {
- Batch batch = handle.createBatch();
- segments
- .stream()
- .map(segment -> segment.getId())
- .filter(segmentId -> !versionedIntervalTimeline.isOvershadowed(
- segmentId.getInterval(),
- segmentId.getVersion()
- ))
- .forEach(segmentId -> batch.add(
- StringUtils.format(
- "UPDATE %s SET used=true WHERE id = '%s'",
- getSegmentsTable(),
- segmentId
- )
- ));
- return batch.execute().length;
- });
+ return markSegmentsAsUsed(segmentIdsToMarkAsUsed);
}
@Override
- public boolean enableSegment(final String segmentId)
+ public int markAsUsedNonOvershadowedSegments(final String dataSource, final
Set<String> segmentIds)
+ throws UnknownSegmentIdException
{
try {
- connector.getDBI().withHandle(
- new HandleCallback<Void>()
- {
- @Override
- public Void withHandle(Handle handle)
- {
- handle.createStatement(StringUtils.format("UPDATE %s SET
used=true WHERE id = :id", getSegmentsTable()))
- .bind("id", segmentId)
- .execute();
- return null;
- }
- }
- );
+ Pair<List<DataSegment>, VersionedIntervalTimeline<String, DataSegment>>
unusedSegmentsAndTimeline = connector
+ .inReadOnlyTransaction(
+ (handle, status) -> {
+ List<DataSegment> unusedSegments =
retreiveUnusedSegments(dataSource, segmentIds, handle);
+ List<Interval> unusedSegmentsIntervals =
JodaUtils.condenseIntervals(
+
unusedSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
+ );
+ Iterator<DataSegment>
usedSegmentsOverlappingUnusedSegmentsIntervals =
+ retreiveUsedSegmentsOverlappingIntervals(dataSource,
unusedSegmentsIntervals, handle);
+ VersionedIntervalTimeline<String, DataSegment> timeline =
VersionedIntervalTimeline.forSegments(
+
Iterators.concat(usedSegmentsOverlappingUnusedSegmentsIntervals,
unusedSegments.iterator())
+ );
+ return new Pair<>(unusedSegments, timeline);
+ }
+ );
+
+ List<DataSegment> unusedSegments = unusedSegmentsAndTimeline.lhs;
+ VersionedIntervalTimeline<String, DataSegment> timeline =
unusedSegmentsAndTimeline.rhs;
+ return markNonOvershadowedSegmentsAsUsed(dataSource, unusedSegments,
timeline);
}
catch (Exception e) {
- log.error(e, "Exception enabling segment %s", segmentId);
- return false;
+ Throwable rootCause = Throwables.getRootCause(e);
+ if (rootCause instanceof UnknownSegmentIdException) {
+ throw (UnknownSegmentIdException) rootCause;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ private List<DataSegment> retreiveUnusedSegments(
Review comment:
Thanks, fixed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]