surekhasaharan commented on a change in pull request #7490: Add reload by
interval API
URL: https://github.com/apache/incubator-druid/pull/7490#discussion_r276878309
##########
File path:
server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
##########
@@ -219,82 +219,111 @@ public void stop()
}
}
+ private VersionedIntervalTimeline<String, DataSegment>
getVersionedIntervalTimeline(final String dataSource)
+ {
+ return connector.inReadOnlyTransaction(
+ (handle, status) -> VersionedIntervalTimeline.forSegments(
+ Iterators.transform(
+ handle
+ .createQuery(
+ StringUtils.format(
+ "SELECT payload FROM %s WHERE dataSource =
:dataSource",
+ getSegmentsTable()
+ )
+ )
+ .setFetchSize(connector.getStreamingFetchSize())
+ .bind("dataSource", dataSource)
+ .map(ByteArrayMapper.FIRST)
+ .iterator(),
+ payload -> {
+ try {
+ return jsonMapper.readValue(payload, DataSegment.class);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ )
+
+ )
+ );
+ }
+
+ private Stream<SegmentId> segmentIdsForInterval(
+ final VersionedIntervalTimeline<String, DataSegment>
versionedIntervalTimeline,
+ final Interval interval
+ )
+ {
+ return versionedIntervalTimeline.lookup(interval).stream().flatMap(
+ objectHolder ->
StreamSupport.stream(objectHolder.getObject().spliterator(), false).map(
+ dataSegmentPartitionChunk ->
dataSegmentPartitionChunk.getObject().getId()
+ )
+ );
+ }
+
@Override
public boolean enableDataSource(final String dataSource)
{
try {
- final IDBI dbi = connector.getDBI();
- VersionedIntervalTimeline<String, DataSegment> segmentTimeline =
connector.inReadOnlyTransaction(
- (handle, status) -> VersionedIntervalTimeline.forSegments(
- Iterators.transform(
- handle
- .createQuery(
- StringUtils.format(
- "SELECT payload FROM %s WHERE dataSource =
:dataSource",
- getSegmentsTable()
- )
- )
- .setFetchSize(connector.getStreamingFetchSize())
- .bind("dataSource", dataSource)
- .map(ByteArrayMapper.FIRST)
- .iterator(),
- payload -> {
- try {
- return jsonMapper.readValue(payload, DataSegment.class);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- )
-
- )
- );
+ return enableSegments(dataSource, FOREVER);
+ }
+ catch (Exception e) {
+ log.error(e, "Exception enabling datasource %s", dataSource);
+ return false;
+ }
+ }
- final List<DataSegment> segments = new ArrayList<>();
- List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders =
segmentTimeline.lookup(
- Intervals.of("0000-01-01/3000-01-01")
- );
- for (TimelineObjectHolder<String, DataSegment> objectHolder :
timelineObjectHolders) {
- for (PartitionChunk<DataSegment> partitionChunk :
objectHolder.getObject()) {
- segments.add(partitionChunk.getObject());
- }
- }
+ @Override
+ public boolean enableSegments(final String dataSource, final Interval
interval)
+ {
+ VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline =
getVersionedIntervalTimeline(dataSource);
- if (segments.isEmpty()) {
- log.warn("No segments found in the database!");
- return false;
- }
+ return enableSegments(
+ segmentIdsForInterval(versionedIntervalTimeline,
interval).collect(Collectors.toSet()),
+ versionedIntervalTimeline
+ );
+ }
- dbi.withHandle(
- new HandleCallback<Void>()
- {
- @Override
- public Void withHandle(Handle handle)
- {
- Batch batch = handle.createBatch();
+ @Override
+ public boolean enableSegments(final String dataSource, final
Collection<String> segmentIds)
+ {
+ VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline =
getVersionedIntervalTimeline(dataSource);
- for (DataSegment segment : segments) {
- batch.add(
- StringUtils.format(
- "UPDATE %s SET used=true WHERE id = '%s'",
- getSegmentsTable(),
- segment.getId()
- )
- );
- }
- batch.execute();
+ return enableSegments(
+ segmentIdsForInterval(versionedIntervalTimeline, FOREVER)
+ .filter(segmentId -> segmentIds.contains(segmentId.toString()))
+ .collect(Collectors.toSet()),
+ versionedIntervalTimeline
+ );
+ }
- return null;
- }
- }
- );
- }
- catch (Exception e) {
- log.error(e, "Exception enabling datasource %s", dataSource);
+ private boolean enableSegments(
+ final Collection<SegmentId> segmentIds,
+ final VersionedIntervalTimeline<String, DataSegment>
versionedIntervalTimeline
+ )
+ {
+ if (segmentIds.isEmpty()) {
Review comment:
I think it's okay to keep the warning log if no segments found for the
interval in the db
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]