himanshug commented on a change in pull request #8115: Add shuffleSegmentPusher
for data shuffle
URL: https://github.com/apache/incubator-druid/pull/8115#discussion_r308452967
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
##########
@@ -227,17 +250,68 @@ private void
deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup
/**
* Write a segment into one of configured locations. The location to write
is chosen in a round-robin manner per
* supervisorTaskId.
- *
- * This method is only useful for the new Indexer model. Tasks running in
the existing middleManager should the static
- * addSegment method.
*/
- public void addSegment(String supervisorTaskId, String subTaskId,
DataSegment segment, File segmentFile)
+ public long addSegment(String supervisorTaskId, String subTaskId,
DataSegment segment, File segmentDir)
+ throws IOException
{
+ // Get or create the location iterator for supervisorTask.
final Iterator<StorageLocation> iterator =
locationIterators.computeIfAbsent(
supervisorTaskId,
- k -> Iterators.cycle(shuffleDataLocations)
+ k -> {
+ final Iterator<StorageLocation> cyclicIterator =
Iterators.cycle(shuffleDataLocations);
+ // Random start of the iterator
+ final int random =
ThreadLocalRandom.current().nextInt(shuffleDataLocations.size());
+ IntStream.range(0, random).forEach(i -> cyclicIterator.next());
+ return cyclicIterator;
+ }
);
- addSegment(iterator, shuffleDataLocations.size(), supervisorTaskId,
subTaskId, segment, segmentFile);
+
+ // Create a zipped segment in a temp directory.
+ final File taskTempDir = taskConfig.getTaskTempDir(subTaskId);
+ if (taskTempDir.mkdirs()) {
+ taskTempDir.deleteOnExit();
+ }
+ final File tempZippedFile = new File(taskTempDir,
segment.getId().toString());
+ final long unzippedSizeBytes = CompressionUtils.zip(segmentDir,
tempZippedFile, true);
Review comment:
`fsync=true` here is useless as this is only a temp location
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]