maytasm commented on a change in pull request #10689:
URL: https://github.com/apache/druid/pull/10689#discussion_r546965354



##########
File path: processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
##########
@@ -931,25 +936,158 @@ public File merge(
       boolean rollup,
       final AggregatorFactory[] metricAggs,
       File outDir,
-      IndexSpec indexSpec
+      IndexSpec indexSpec,
+      int maxColumnsToMerge
   ) throws IOException
   {
-    return merge(indexes, rollup, metricAggs, outDir, indexSpec, new 
BaseProgressIndicator(), null);
+    return multiphaseMerge(indexes, rollup, metricAggs, outDir, indexSpec, new 
BaseProgressIndicator(), null, maxColumnsToMerge);
   }
 
-  private File merge(
+  private File multiphaseMerge(
       List<IndexableAdapter> indexes,
       final boolean rollup,
       final AggregatorFactory[] metricAggs,
       File outDir,
       IndexSpec indexSpec,
       ProgressIndicator progress,
-      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      int maxColumnsToMerge
   ) throws IOException
   {
     FileUtils.deleteDirectory(outDir);
     org.apache.commons.io.FileUtils.forceMkdir(outDir);
 
+    List<File> tempDirs = new ArrayList<>();
+
+    if (maxColumnsToMerge == IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE) {
+      return merge(
+          indexes,
+          rollup,
+          metricAggs,
+          outDir,
+          indexSpec,
+          progress,
+          segmentWriteOutMediumFactory
+      );
+    }
+
+    List<List<IndexableAdapter>> currentPhases = getMergePhases(indexes, 
maxColumnsToMerge);
+    List<File> currentOutputs = new ArrayList<>();
+
+    log.debug("base outDir: " + outDir);
+
+    try {
+      while (true) {
+        for (List<IndexableAdapter> phase : currentPhases) {

Review comment:
       is it useful to log the size of currentPhases? It might help to see the 
progress as the number should decrease after each pass

##########
File path: processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
##########
@@ -931,25 +936,158 @@ public File merge(
       boolean rollup,
       final AggregatorFactory[] metricAggs,
       File outDir,
-      IndexSpec indexSpec
+      IndexSpec indexSpec,
+      int maxColumnsToMerge
   ) throws IOException
   {
-    return merge(indexes, rollup, metricAggs, outDir, indexSpec, new 
BaseProgressIndicator(), null);
+    return multiphaseMerge(indexes, rollup, metricAggs, outDir, indexSpec, new 
BaseProgressIndicator(), null, maxColumnsToMerge);
   }
 
-  private File merge(
+  private File multiphaseMerge(
       List<IndexableAdapter> indexes,
       final boolean rollup,
       final AggregatorFactory[] metricAggs,
       File outDir,
       IndexSpec indexSpec,
       ProgressIndicator progress,
-      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      int maxColumnsToMerge
   ) throws IOException
   {
     FileUtils.deleteDirectory(outDir);
     org.apache.commons.io.FileUtils.forceMkdir(outDir);
 
+    List<File> tempDirs = new ArrayList<>();
+
+    if (maxColumnsToMerge == IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE) {
+      return merge(
+          indexes,
+          rollup,
+          metricAggs,
+          outDir,
+          indexSpec,
+          progress,
+          segmentWriteOutMediumFactory
+      );
+    }
+
+    List<List<IndexableAdapter>> currentPhases = getMergePhases(indexes, 
maxColumnsToMerge);
+    List<File> currentOutputs = new ArrayList<>();
+
+    log.debug("base outDir: " + outDir);
+
+    try {
+      while (true) {

Review comment:
       Is it useful to log the iteration number of this loop?
   like how many times have we done a pass so far?

##########
File path: processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
##########
@@ -931,25 +936,158 @@ public File merge(
       boolean rollup,
       final AggregatorFactory[] metricAggs,
       File outDir,
-      IndexSpec indexSpec
+      IndexSpec indexSpec,
+      int maxColumnsToMerge
   ) throws IOException
   {
-    return merge(indexes, rollup, metricAggs, outDir, indexSpec, new 
BaseProgressIndicator(), null);
+    return multiphaseMerge(indexes, rollup, metricAggs, outDir, indexSpec, new 
BaseProgressIndicator(), null, maxColumnsToMerge);
   }
 
-  private File merge(
+  private File multiphaseMerge(
       List<IndexableAdapter> indexes,
       final boolean rollup,
       final AggregatorFactory[] metricAggs,
       File outDir,
       IndexSpec indexSpec,
       ProgressIndicator progress,
-      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      int maxColumnsToMerge
   ) throws IOException
   {
     FileUtils.deleteDirectory(outDir);
     org.apache.commons.io.FileUtils.forceMkdir(outDir);
 
+    List<File> tempDirs = new ArrayList<>();
+
+    if (maxColumnsToMerge == IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE) {
+      return merge(
+          indexes,
+          rollup,
+          metricAggs,
+          outDir,
+          indexSpec,
+          progress,
+          segmentWriteOutMediumFactory
+      );
+    }
+
+    List<List<IndexableAdapter>> currentPhases = getMergePhases(indexes, 
maxColumnsToMerge);
+    List<File> currentOutputs = new ArrayList<>();
+
+    log.debug("base outDir: " + outDir);
+
+    try {
+      while (true) {
+        for (List<IndexableAdapter> phase : currentPhases) {
+          File phaseOutDir;
+          if (currentPhases.size() == 1) {
+            // use the given outDir on the final merge phase
+            phaseOutDir = outDir;
+          } else {
+            phaseOutDir = FileUtils.createTempDir();
+            tempDirs.add(phaseOutDir);
+          }
+          log.debug("phase outDir: " + phaseOutDir);
+
+          File phaseOutput = merge(
+              phase,
+              rollup,
+              metricAggs,
+              phaseOutDir,
+              indexSpec,
+              progress,
+              segmentWriteOutMediumFactory
+          );
+          currentOutputs.add(phaseOutput);
+        }
+        if (currentOutputs.size() == 1) {
+          // we're done, we made a single File output
+          return currentOutputs.get(0);
+        } else {
+          // convert Files to QueryableIndexIndexableAdapter and do another 
merge phase
+          List<IndexableAdapter> qIndexAdapters = new ArrayList<>();
+          for (File outputFile : currentOutputs) {
+            QueryableIndex qIndex = indexIO.loadIndex(outputFile, true);
+            qIndexAdapters.add(new QueryableIndexIndexableAdapter(qIndex));
+          }
+          currentPhases = getMergePhases(qIndexAdapters, maxColumnsToMerge);
+          currentOutputs = new ArrayList<>();
+        }
+      }
+    }
+    finally {
+      for (File tempDir : tempDirs) {
+        if (tempDir.exists()) {
+          try {
+            FileUtils.deleteDirectory(tempDir);
+          }
+          catch (Exception e) {
+            log.warn(e, "Failed to remove directory[%s]", tempDir);
+          }
+        }
+      }
+    }
+  }
+
+  private List<List<IndexableAdapter>> getMergePhases(List<IndexableAdapter> 
indexes, int maxColumnsToMerge)
+  {
+    List<List<IndexableAdapter>> toMerge = new ArrayList<>();
+    // always merge at least two segments regardless of column limit
+    if (indexes.size() <= 2) {
+      if (getIndexColumnCount(indexes) > maxColumnsToMerge) {
+        log.warn("index pair has more columns than maxColumnsToMerge [%d].", 
maxColumnsToMerge);

Review comment:
       Should this be a warn since we expected to always merge at least two 
segments regardless of column limit? The warning may be misleading as there is 
nothing to fix / change

##########
File path: processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
##########
@@ -931,25 +936,158 @@ public File merge(
       boolean rollup,
       final AggregatorFactory[] metricAggs,
       File outDir,
-      IndexSpec indexSpec
+      IndexSpec indexSpec,
+      int maxColumnsToMerge
   ) throws IOException
   {
-    return merge(indexes, rollup, metricAggs, outDir, indexSpec, new 
BaseProgressIndicator(), null);
+    return multiphaseMerge(indexes, rollup, metricAggs, outDir, indexSpec, new 
BaseProgressIndicator(), null, maxColumnsToMerge);
   }
 
-  private File merge(
+  private File multiphaseMerge(
       List<IndexableAdapter> indexes,
       final boolean rollup,
       final AggregatorFactory[] metricAggs,
       File outDir,
       IndexSpec indexSpec,
       ProgressIndicator progress,
-      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      int maxColumnsToMerge
   ) throws IOException
   {
     FileUtils.deleteDirectory(outDir);
     org.apache.commons.io.FileUtils.forceMkdir(outDir);
 
+    List<File> tempDirs = new ArrayList<>();
+
+    if (maxColumnsToMerge == IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE) {
+      return merge(
+          indexes,
+          rollup,
+          metricAggs,
+          outDir,
+          indexSpec,
+          progress,
+          segmentWriteOutMediumFactory
+      );
+    }
+
+    List<List<IndexableAdapter>> currentPhases = getMergePhases(indexes, 
maxColumnsToMerge);
+    List<File> currentOutputs = new ArrayList<>();
+
+    log.debug("base outDir: " + outDir);
+
+    try {
+      while (true) {
+        for (List<IndexableAdapter> phase : currentPhases) {
+          File phaseOutDir;
+          if (currentPhases.size() == 1) {
+            // use the given outDir on the final merge phase
+            phaseOutDir = outDir;
+          } else {
+            phaseOutDir = FileUtils.createTempDir();
+            tempDirs.add(phaseOutDir);
+          }
+          log.debug("phase outDir: " + phaseOutDir);
+
+          File phaseOutput = merge(
+              phase,
+              rollup,
+              metricAggs,
+              phaseOutDir,
+              indexSpec,
+              progress,
+              segmentWriteOutMediumFactory
+          );
+          currentOutputs.add(phaseOutput);
+        }
+        if (currentOutputs.size() == 1) {
+          // we're done, we made a single File output
+          return currentOutputs.get(0);
+        } else {
+          // convert Files to QueryableIndexIndexableAdapter and do another 
merge phase
+          List<IndexableAdapter> qIndexAdapters = new ArrayList<>();
+          for (File outputFile : currentOutputs) {
+            QueryableIndex qIndex = indexIO.loadIndex(outputFile, true);
+            qIndexAdapters.add(new QueryableIndexIndexableAdapter(qIndex));
+          }
+          currentPhases = getMergePhases(qIndexAdapters, maxColumnsToMerge);
+          currentOutputs = new ArrayList<>();
+        }
+      }
+    }
+    finally {
+      for (File tempDir : tempDirs) {
+        if (tempDir.exists()) {
+          try {
+            FileUtils.deleteDirectory(tempDir);
+          }
+          catch (Exception e) {
+            log.warn(e, "Failed to remove directory[%s]", tempDir);
+          }
+        }
+      }
+    }
+  }
+
+  private List<List<IndexableAdapter>> getMergePhases(List<IndexableAdapter> 
indexes, int maxColumnsToMerge)
+  {
+    List<List<IndexableAdapter>> toMerge = new ArrayList<>();
+    // always merge at least two segments regardless of column limit
+    if (indexes.size() <= 2) {
+      if (getIndexColumnCount(indexes) > maxColumnsToMerge) {
+        log.warn("index pair has more columns than maxColumnsToMerge [%d].", 
maxColumnsToMerge);
+      }
+      toMerge.add(indexes);
+    } else {
+      List<IndexableAdapter> currentPhase = new ArrayList<>();
+      int currentColumnCount = 0;
+      for (IndexableAdapter index : indexes) {
+        int indexColumnCount = getIndexColumnCount(index);
+        if (indexColumnCount > maxColumnsToMerge) {
+          log.warn("index has more columns [%d] than maxColumnsToMerge [%d]!", 
indexColumnCount, maxColumnsToMerge);

Review comment:
       Should this be a warn since this can happen and is a expected / ok 
thing? The warning may be misleading as there is nothing to fix / change




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to