jihoonson commented on a change in pull request #7048: Make 
IngestSegmentFirehoseFactory splittable for parallel ingestion
URL: https://github.com/apache/incubator-druid/pull/7048#discussion_r268328318
 
 

 ##########
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
 ##########
 @@ -251,6 +285,179 @@ private long jitter(long input)
     return retval < 0 ? 0 : retval;
   }
 
+  private List<TimelineObjectHolder<String, DataSegment>> getTimeline()
+  {
+    if (interval == null) {
+      return getTimelineForSegmentIds();
+    } else {
+      return getTimelineForInterval();
+    }
+  }
+
+  private List<TimelineObjectHolder<String, DataSegment>> 
getTimelineForInterval()
+  {
+    Preconditions.checkNotNull(interval);
+
+    // This call used to use the TaskActionClient, so for compatibility we use 
the same retry configuration
+    // as TaskActionClient.
+    final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
+    List<DataSegment> usedSegments;
+    while (true) {
+      try {
+        usedSegments =
+            coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, 
Collections.singletonList(interval));
+        break;
+      }
+      catch (Throwable e) {
+        log.warn(e, "Exception getting database segments");
+        final Duration delay = retryPolicy.getAndIncrementRetryDelay();
+        if (delay == null) {
+          throw e;
+        } else {
+          final long sleepTime = jitter(delay.getMillis());
+          log.info("Will try again in [%s].", new 
Duration(sleepTime).toString());
+          try {
+            Thread.sleep(sleepTime);
+          }
+          catch (InterruptedException e2) {
+            throw new RuntimeException(e2);
+          }
+        }
+      }
+    }
+
+    return 
VersionedIntervalTimeline.forSegments(usedSegments).lookup(interval);
+  }
+
+  private List<TimelineObjectHolder<String, DataSegment>> 
getTimelineForSegmentIds()
+  {
+    final SortedMap<Interval, TimelineObjectHolder<String, DataSegment>> 
timeline = new TreeMap<>(
+        Comparators.intervalsByStartThenEnd()
+    );
+    for (WindowedSegmentId windowedSegmentId : 
Preconditions.checkNotNull(segmentIds)) {
+      final DataSegment segment = 
coordinatorClient.getDatabaseSegmentDataSourceSegment(
+          dataSource,
+          windowedSegmentId.getSegmentId()
+      );
+      for (Interval interval : windowedSegmentId.getIntervals()) {
+        final TimelineObjectHolder<String, DataSegment> existingHolder = 
timeline.get(interval);
+        if (existingHolder != null) {
+          if (!existingHolder.getVersion().equals(segment.getVersion())) {
+            throw new ISE("Timeline segments with the same interval should 
have the same version: " +
+                          "existing version[%s] vs new segment[%s]", 
existingHolder.getVersion(), segment);
+          }
+          
existingHolder.getObject().add(segment.getShardSpec().createChunk(segment));
+        } else {
+          timeline.put(interval, new TimelineObjectHolder<>(
+              interval,
+              segment.getInterval(),
+              segment.getVersion(),
+              new 
PartitionHolder<DataSegment>(segment.getShardSpec().createChunk(segment))
+          ));
+        }
+      }
+    }
+
+    // Validate that none of the given windows overlaps (except for when 
multiple segments share exactly the
+    // same interval).
+    Interval lastInterval = null;
+    for (Interval interval : timeline.keySet()) {
+      if (lastInterval != null) {
+        if (interval.overlaps(lastInterval)) {
+          throw new IAE(
+              "Distinct intervals in input segments may not overlap: [%s] vs 
[%s]",
+              lastInterval,
+              interval
+          );
+        }
+      }
+      lastInterval = interval;
+    }
+
+    return new ArrayList<>(timeline.values());
+  }
+
+  private void initializeSplitsIfNeeded()
+  {
+    if (splits != null) {
+      return;
+    }
+
+    // isSplittable() ensures this is only called when we have an interval.
+    final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = 
getTimelineForInterval();
+
+    // We do the simplest possible greedy algorithm here instead of anything 
cleverer. The general bin packing
+    // problem is NP-hard, and we'd like to get segments from the same 
interval into the same split so that their
+    // data can combine with each other anyway.
+
+    List<InputSplit<List<WindowedSegmentId>>> newSplits = new ArrayList<>();
+    List<WindowedSegmentId> currentSplit = new ArrayList<>();
+    Map<DataSegment, WindowedSegmentId> windowedSegmentIds = new HashMap<>();
+    long bytesInCurrentSplit = 0;
+    for (TimelineObjectHolder<String, DataSegment> timelineHolder : 
timelineSegments) {
+      for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
+        final DataSegment segment = chunk.getObject();
+        final WindowedSegmentId existingWindowedSegmentId = 
windowedSegmentIds.get(segment);
+        if (existingWindowedSegmentId != null) {
+          // We've already seen this segment in the timeline, so just add this 
interval to it. It has already
+          // been placed into a split.
+          
existingWindowedSegmentId.getIntervals().add(timelineHolder.getInterval());
+        } else {
+          // It's the first time we've seen this segment, so create a new 
WindowedSegmentId.
+          List<Interval> intervals = new ArrayList<>();
+          // Use the interval that contributes to the timeline, not the entire 
segment's true interval.
+          intervals.add(timelineHolder.getInterval());
+          final WindowedSegmentId newWindowedSegmentId = new 
WindowedSegmentId(segment.getId().toString(), intervals);
+          windowedSegmentIds.put(segment, newWindowedSegmentId);
+
+          // Now figure out if it goes in the current split or not.
+          final long segmentBytes = segment.getSize();
+          if (bytesInCurrentSplit + segmentBytes > maxInputSegmentBytesPerTask 
&& !currentSplit.isEmpty()) {
+            // This segment won't fit in the current non-empty split, so this 
split is done.
+            newSplits.add(new InputSplit<>(currentSplit));
+            currentSplit = new ArrayList<>();
+            bytesInCurrentSplit = 0;
+          }
+          if (segmentBytes > maxInputSegmentBytesPerTask) {
 
 Review comment:
   > In practice I expect that most people's segments have uniform granularity 
(or at least are non-overlapping, eg older data having uniform day granularity 
and newer data having uniform hour granularity) and so the only time that 
windows will not be the full segment size is at the beginning and end of the 
total interval. And even having misalignment there will be a mistake if you're 
using this to re-ingest data into itself with appendToSegments: false since it 
will drop data in the start and end segments that are outside of the interval. 
So I think windowing is not going to be a super common use case in practice, 
and so doing a performance optimization that only matters for the windowing use 
case might not be worth the extra code complexity and more complicated 
documentation needed for maxInputSegmentBytesPerTask. (Of course the 
overlapping segment use case needs to be supported for correctness because 
that's how the Druid data model works. But that doesn't mean it needs 
performance optimizations?)
   
   If I understand correctly, windowing would happen depending on the value of 
`maxInputSegmentBytesPerTask` rather than segment granularity because split is 
created whenever `bytesInCurrentSplit + segmentBytes` is larger than 
`maxInputSegmentBytesPerTask`. Am I missing something?
   
   > Size-based split does help keep subtasks equal, but it also has the 
purpose of bounding the amount of disk space needed for each task (because the 
current implementation downloads all segments at once at the beginning of the 
task), at least as long as all individual segments fall below the boundary. It 
would be a shame if a single task ended up requiring an enormous amount of disk 
space (and download time) if it ended up being assigned a large number of 
segments which only contribute a small fraction of their data to the output 
segment.
   
   Good point. I think it would be an issue, but maybe it would be better to 
handle it in `IngestSegmentFirehoseFactory` like lazily downloading segments 
whenever needed (probably worth to combine with 
`PrefetchableTextFilesFirehoseFactory`?). But, this should be done in a 
separate PR and I'm fine with going as it is. Would you please add a comment 
about this concern?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to