surekhasaharan commented on a change in pull request #7653: Refactor SQLMetadataSegmentManager; Change contract of REST methods in DataSourcesResource URL: https://github.com/apache/incubator-druid/pull/7653#discussion_r287892431
########## File path: server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java ########## @@ -197,366 +340,511 @@ private Runnable createPollTaskForStartOrder(long startOrder) } @Override - @LifecycleStop - public void stop() + public boolean isPollingDatabasePeriodically() { - ReentrantReadWriteLock.WriteLock lock = startStopLock.writeLock(); + // isPollingDatabasePeriodically() is synchronized together with startPollingDatabasePeriodically(), + // stopPollingDatabasePeriodically() and poll() to ensure that the latest currentStartPollingOrder is always + // visible. readLock should be used to avoid unexpected performance degradation of DruidCoordinator. + ReentrantReadWriteLock.ReadLock lock = startStopPollLock.readLock(); lock.lock(); try { - if (!isStarted()) { + return currentStartPollingOrder >= 0; + } + finally { + lock.unlock(); + } + } + + @Override + public void stopPollingDatabasePeriodically() + { + ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock(); + lock.lock(); + try { + if (!isPollingDatabasePeriodically()) { return; } - dataSources = null; - currentStartOrder = -1; - exec.shutdownNow(); - exec = null; + periodicPollTaskFuture.cancel(false); + latestDatabasePoll = null; + + // NOT nulling dataSources, allowing to query the latest polled data even when this SegmentsMetadata object is + // stopped. + + currentStartPollingOrder = -1; } finally { lock.unlock(); } } - private Pair<DataSegment, Boolean> usedPayloadMapper( - final int index, - final ResultSet resultSet, - final StatementContext context - ) throws SQLException + private void awaitOrPerformDatabasePoll() { + // Double-checked locking with awaitLatestDatabasePoll() call playing the role of the "check". + if (awaitLatestDatabasePoll()) { + return; + } + ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock(); + lock.lock(); try { - return new Pair<>( - jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class), - resultSet.getBoolean("used") - ); + if (awaitLatestDatabasePoll()) { + return; + } + OnDemandDatabasePoll newOnDemandUpdate = new OnDemandDatabasePoll(); + this.latestDatabasePoll = newOnDemandUpdate; + doOnDemandPoll(newOnDemandUpdate); } - catch (IOException e) { - throw new RuntimeException(e); + finally { + lock.unlock(); } } /** - * Gets a list of all datasegments that overlap the provided interval along with thier used status. + * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or an {@link OnDemandDatabasePoll} that is + * made not longer than {@link #periodicPollDelay} from now, awaits for it and returns true; returns false otherwise, + * meaning that a new on-demand database poll should be initiated. */ - private List<Pair<DataSegment, Boolean>> getDataSegmentsOverlappingInterval( - final String dataSource, - final Interval interval - ) + private boolean awaitLatestDatabasePoll() { - return connector.inReadOnlyTransaction( - (handle, status) -> handle.createQuery( - StringUtils.format( - "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start", - getSegmentsTable(), - connector.getQuoteString() - ) - ) - .setFetchSize(connector.getStreamingFetchSize()) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map(this::usedPayloadMapper) - .list() - ); + DatabasePoll latestDatabasePoll = this.latestDatabasePoll; + if (latestDatabasePoll instanceof PeriodicDatabasePoll) { + Futures.getUnchecked(((PeriodicDatabasePoll) latestDatabasePoll).firstPollCompletionFuture); + return true; + } + if (latestDatabasePoll instanceof OnDemandDatabasePoll) { + long periodicPollDelayNanos = TimeUnit.MILLISECONDS.toNanos(periodicPollDelay.getMillis()); + OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll) latestDatabasePoll; + boolean latestUpdateIsFresh = latestOnDemandPoll.nanosElapsedFromInitiation() < periodicPollDelayNanos; + if (latestUpdateIsFresh) { + Futures.getUnchecked(latestOnDemandPoll.pollCompletionFuture); + return true; + } + // Latest on-demand update is not fresh. Fall through to return false from this method. + } else { + assert latestDatabasePoll == null; + // No periodic updates and no on-demand database poll have been done yet, nothing to await for. + } + return false; } - private List<Pair<DataSegment, Boolean>> getDataSegments( - final String dataSource, - final Collection<String> segmentIds, - final Handle handle - ) + private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll) { - return segmentIds.stream().map( - segmentId -> Optional.ofNullable( - handle.createQuery( - StringUtils.format( - "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource AND id = :id", - getSegmentsTable() - ) - ) - .bind("dataSource", dataSource) - .bind("id", segmentId) - .map(this::usedPayloadMapper) - .first() - ) - .orElseThrow(() -> new UnknownSegmentIdException(StringUtils.format("Cannot find segment id [%s]", segmentId))) - ) - .collect(Collectors.toList()); - } - - /** - * Builds a VersionedIntervalTimeline containing used segments that overlap the intervals passed. - */ - private VersionedIntervalTimeline<String, DataSegment> buildVersionedIntervalTimeline( - final String dataSource, - final Collection<Interval> intervals, - final Handle handle - ) - { - return VersionedIntervalTimeline.forSegments(intervals - .stream() - .flatMap(interval -> handle.createQuery( - StringUtils.format( - "SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start AND used = true", - getSegmentsTable(), - connector.getQuoteString() - ) - ) - .setFetchSize(connector.getStreamingFetchSize()) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map((i, resultSet, context) -> { - try { - return jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class); - } - catch (IOException e) { - throw new RuntimeException(e); - } - }) - .list() - .stream() - ) - .iterator() - ); + try { + poll(); + onDemandPoll.pollCompletionFuture.complete(null); + } + catch (Throwable t) { + onDemandPoll.pollCompletionFuture.completeExceptionally(t); + throw t; + } } @Override - public boolean enableDataSource(final String dataSource) + public boolean markSegmentAsUsed(final String segmentId) { try { - return enableSegments(dataSource, Intervals.ETERNITY) != 0; + int numUpdatedDatabaseEntries = connector.getDBI().withHandle( + (Handle handle) -> handle + .createStatement(StringUtils.format("UPDATE %s SET used=true WHERE id = :id", getSegmentsTable())) + .bind("id", segmentId) + .execute() + ); + // Unlike bulk markAsUsed methods: markAsUsedAllNonOvershadowedSegmentsInDataSource(), + // markAsUsedNonOvershadowedSegmentsInInterval(), and markAsUsedNonOvershadowedSegments() we don't put the marked + // segment into the respective data source, because we don't have it fetched from the database. It's probably not + // worth complicating the implementation and making two database queries just to add the segment because it will + // be anyway fetched during the next poll(). Segment putting that is done in the bulk markAsUsed methods is a nice + // to have thing, but doesn't formally affects the external guarantees of SegmentsMetadata class. + return numUpdatedDatabaseEntries > 0; } - catch (Exception e) { - log.error(e, "Exception enabling datasource %s", dataSource); - return false; + catch (RuntimeException e) { + log.error(e, "Exception marking segment %s as used", segmentId); + throw e; } } @Override - public int enableSegments(final String dataSource, final Interval interval) + public int markAsUsedAllNonOvershadowedSegmentsInDataSource(final String dataSource) { - List<Pair<DataSegment, Boolean>> segments = getDataSegmentsOverlappingInterval(dataSource, interval); - List<DataSegment> segmentsToEnable = segments.stream() - .filter(segment -> !segment.rhs && interval.contains(segment.lhs.getInterval())) - .map(segment -> segment.lhs) - .collect(Collectors.toList()); - - VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline = VersionedIntervalTimeline.forSegments( - segments.stream().filter(segment -> segment.rhs).map(segment -> segment.lhs).iterator() - ); - VersionedIntervalTimeline.addSegments(versionedIntervalTimeline, segmentsToEnable.iterator()); - - return enableSegments( - segmentsToEnable, - versionedIntervalTimeline - ); + return doMarkAsUsedNonOvershadowedSegments(dataSource, null); } @Override - public int enableSegments(final String dataSource, final Collection<String> segmentIds) + public int markAsUsedNonOvershadowedSegmentsInInterval(final String dataSource, final Interval interval) { - Pair<List<DataSegment>, VersionedIntervalTimeline<String, DataSegment>> data = connector.inReadOnlyTransaction( - (handle, status) -> { - List<DataSegment> segments = getDataSegments(dataSource, segmentIds, handle) - .stream() - .filter(pair -> !pair.rhs) - .map(pair -> pair.lhs) - .collect(Collectors.toList()); - - VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline = buildVersionedIntervalTimeline( - dataSource, - JodaUtils.condenseIntervals(segments.stream().map(segment -> segment.getInterval()).collect(Collectors.toList())), - handle - ); - VersionedIntervalTimeline.addSegments(versionedIntervalTimeline, segments.iterator()); + Preconditions.checkNotNull(interval); + return doMarkAsUsedNonOvershadowedSegments(dataSource, interval); + } - return new Pair<>( - segments, - versionedIntervalTimeline - ); + /** + * Implementation for both {@link #markAsUsedAllNonOvershadowedSegmentsInDataSource} (if the given interval is null) + * and {@link #markAsUsedNonOvershadowedSegmentsInInterval}. + */ + private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable Interval interval) + { + List<DataSegment> usedSegmentsOverlappingInterval = new ArrayList<>(); + List<DataSegment> unusedSegmentsInInterval = new ArrayList<>(); + connector.inReadOnlyTransaction( + (handle, status) -> { + String queryString = + StringUtils.format("SELECT used, payload FROM %1$s WHERE dataSource = :dataSource", getSegmentsTable()); + if (interval != null) { + queryString += StringUtils.format(" AND start < :end AND %1$send%1$s > :start", connector.getQuoteString()); + } + Query<?> query = handle + .createQuery(queryString) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("dataSource", dataSourceName); + if (interval != null) { + query = query + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()); + } + query = query + .map((int index, ResultSet resultSet, StatementContext context) -> { + try { + DataSegment segment = jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class); + if (resultSet.getBoolean("used")) { + usedSegmentsOverlappingInterval.add(segment); + } else { + if (interval == null || interval.contains(segment.getInterval())) { + unusedSegmentsInInterval.add(segment); + } + } + return null; + } + catch (IOException e) { + throw new RuntimeException(e); + } + }); + // Consume the query results to ensure usedSegmentsOverlappingInterval and unusedSegmentsInInterval are + // populated. + consume(query.iterator()); + return null; } ); - return enableSegments( - data.lhs, - data.rhs + VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline = VersionedIntervalTimeline.forSegments( + Iterators.concat(usedSegmentsOverlappingInterval.iterator(), unusedSegmentsInInterval.iterator()) ); + + return markNonOvershadowedSegmentsAsUsed(dataSourceName, unusedSegmentsInInterval, versionedIntervalTimeline); } - private int enableSegments( - final Collection<DataSegment> segments, - final VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline + private static void consume(Iterator<?> iterator) + { + while (iterator.hasNext()) { + iterator.next(); + } + } + + /** Also puts non-overshadowed segments into {@link #dataSources}. */ + private int markNonOvershadowedSegmentsAsUsed( + String dataSourceName, + List<DataSegment> unusedSegments, + VersionedIntervalTimeline<String, DataSegment> timeline ) { - if (segments.isEmpty()) { - log.warn("No segments found to update!"); - return 0; + @Nullable + DruidDataSource dataSource = null; + if (dataSources != null) { + dataSource = dataSources.computeIfAbsent( + dataSourceName, + dsName -> new DruidDataSource(dsName, createDefaultDataSourceProperties()) + ); + } + List<String> segmentIdsToMarkAsUsed = new ArrayList<>(); + for (DataSegment segment : unusedSegments) { + if (timeline.isOvershadowed(segment.getInterval(), segment.getVersion())) { + continue; + } + if (dataSource != null) { + dataSource.addSegment(segment); + } + String s = segment.getId().toString(); + segmentIdsToMarkAsUsed.add(s); Review comment: nit : rename s to segmentId or get rid of `s` and do it inline `segmentIdsToMarkAsUsed.add(segment.getId().toString())` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org