maytasm commented on a change in pull request #10689:
URL: https://github.com/apache/druid/pull/10689#discussion_r546965354
##########
File path: processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
##########
@@ -931,25 +936,158 @@ public File merge(
boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
- IndexSpec indexSpec
+ IndexSpec indexSpec,
+ int maxColumnsToMerge
) throws IOException
{
- return merge(indexes, rollup, metricAggs, outDir, indexSpec, new
BaseProgressIndicator(), null);
+ return multiphaseMerge(indexes, rollup, metricAggs, outDir, indexSpec, new
BaseProgressIndicator(), null, maxColumnsToMerge);
}
- private File merge(
+ private File multiphaseMerge(
List<IndexableAdapter> indexes,
final boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
ProgressIndicator progress,
- @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ int maxColumnsToMerge
) throws IOException
{
FileUtils.deleteDirectory(outDir);
org.apache.commons.io.FileUtils.forceMkdir(outDir);
+ List<File> tempDirs = new ArrayList<>();
+
+ if (maxColumnsToMerge == IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE) {
+ return merge(
+ indexes,
+ rollup,
+ metricAggs,
+ outDir,
+ indexSpec,
+ progress,
+ segmentWriteOutMediumFactory
+ );
+ }
+
+ List<List<IndexableAdapter>> currentPhases = getMergePhases(indexes,
maxColumnsToMerge);
+ List<File> currentOutputs = new ArrayList<>();
+
+ log.debug("base outDir: " + outDir);
+
+ try {
+ while (true) {
+ for (List<IndexableAdapter> phase : currentPhases) {
Review comment:
is it useful to log the size of currentPhases? It might help to see the
progress as the number should decrease after each pass
##########
File path: processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
##########
@@ -931,25 +936,158 @@ public File merge(
boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
- IndexSpec indexSpec
+ IndexSpec indexSpec,
+ int maxColumnsToMerge
) throws IOException
{
- return merge(indexes, rollup, metricAggs, outDir, indexSpec, new
BaseProgressIndicator(), null);
+ return multiphaseMerge(indexes, rollup, metricAggs, outDir, indexSpec, new
BaseProgressIndicator(), null, maxColumnsToMerge);
}
- private File merge(
+ private File multiphaseMerge(
List<IndexableAdapter> indexes,
final boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
ProgressIndicator progress,
- @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ int maxColumnsToMerge
) throws IOException
{
FileUtils.deleteDirectory(outDir);
org.apache.commons.io.FileUtils.forceMkdir(outDir);
+ List<File> tempDirs = new ArrayList<>();
+
+ if (maxColumnsToMerge == IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE) {
+ return merge(
+ indexes,
+ rollup,
+ metricAggs,
+ outDir,
+ indexSpec,
+ progress,
+ segmentWriteOutMediumFactory
+ );
+ }
+
+ List<List<IndexableAdapter>> currentPhases = getMergePhases(indexes,
maxColumnsToMerge);
+ List<File> currentOutputs = new ArrayList<>();
+
+ log.debug("base outDir: " + outDir);
+
+ try {
+ while (true) {
Review comment:
Is it useful to log the iteration number of this loop?
like how many times have we done a pass so far?
##########
File path: processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
##########
@@ -931,25 +936,158 @@ public File merge(
boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
- IndexSpec indexSpec
+ IndexSpec indexSpec,
+ int maxColumnsToMerge
) throws IOException
{
- return merge(indexes, rollup, metricAggs, outDir, indexSpec, new
BaseProgressIndicator(), null);
+ return multiphaseMerge(indexes, rollup, metricAggs, outDir, indexSpec, new
BaseProgressIndicator(), null, maxColumnsToMerge);
}
- private File merge(
+ private File multiphaseMerge(
List<IndexableAdapter> indexes,
final boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
ProgressIndicator progress,
- @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ int maxColumnsToMerge
) throws IOException
{
FileUtils.deleteDirectory(outDir);
org.apache.commons.io.FileUtils.forceMkdir(outDir);
+ List<File> tempDirs = new ArrayList<>();
+
+ if (maxColumnsToMerge == IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE) {
+ return merge(
+ indexes,
+ rollup,
+ metricAggs,
+ outDir,
+ indexSpec,
+ progress,
+ segmentWriteOutMediumFactory
+ );
+ }
+
+ List<List<IndexableAdapter>> currentPhases = getMergePhases(indexes,
maxColumnsToMerge);
+ List<File> currentOutputs = new ArrayList<>();
+
+ log.debug("base outDir: " + outDir);
+
+ try {
+ while (true) {
+ for (List<IndexableAdapter> phase : currentPhases) {
+ File phaseOutDir;
+ if (currentPhases.size() == 1) {
+ // use the given outDir on the final merge phase
+ phaseOutDir = outDir;
+ } else {
+ phaseOutDir = FileUtils.createTempDir();
+ tempDirs.add(phaseOutDir);
+ }
+ log.debug("phase outDir: " + phaseOutDir);
+
+ File phaseOutput = merge(
+ phase,
+ rollup,
+ metricAggs,
+ phaseOutDir,
+ indexSpec,
+ progress,
+ segmentWriteOutMediumFactory
+ );
+ currentOutputs.add(phaseOutput);
+ }
+ if (currentOutputs.size() == 1) {
+ // we're done, we made a single File output
+ return currentOutputs.get(0);
+ } else {
+ // convert Files to QueryableIndexIndexableAdapter and do another
merge phase
+ List<IndexableAdapter> qIndexAdapters = new ArrayList<>();
+ for (File outputFile : currentOutputs) {
+ QueryableIndex qIndex = indexIO.loadIndex(outputFile, true);
+ qIndexAdapters.add(new QueryableIndexIndexableAdapter(qIndex));
+ }
+ currentPhases = getMergePhases(qIndexAdapters, maxColumnsToMerge);
+ currentOutputs = new ArrayList<>();
+ }
+ }
+ }
+ finally {
+ for (File tempDir : tempDirs) {
+ if (tempDir.exists()) {
+ try {
+ FileUtils.deleteDirectory(tempDir);
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to remove directory[%s]", tempDir);
+ }
+ }
+ }
+ }
+ }
+
+ private List<List<IndexableAdapter>> getMergePhases(List<IndexableAdapter>
indexes, int maxColumnsToMerge)
+ {
+ List<List<IndexableAdapter>> toMerge = new ArrayList<>();
+ // always merge at least two segments regardless of column limit
+ if (indexes.size() <= 2) {
+ if (getIndexColumnCount(indexes) > maxColumnsToMerge) {
+ log.warn("index pair has more columns than maxColumnsToMerge [%d].",
maxColumnsToMerge);
Review comment:
Should this be a warn since we expected to always merge at least two
segments regardless of column limit? The warning may be misleading as there is
nothing to fix / change
##########
File path: processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
##########
@@ -931,25 +936,158 @@ public File merge(
boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
- IndexSpec indexSpec
+ IndexSpec indexSpec,
+ int maxColumnsToMerge
) throws IOException
{
- return merge(indexes, rollup, metricAggs, outDir, indexSpec, new
BaseProgressIndicator(), null);
+ return multiphaseMerge(indexes, rollup, metricAggs, outDir, indexSpec, new
BaseProgressIndicator(), null, maxColumnsToMerge);
}
- private File merge(
+ private File multiphaseMerge(
List<IndexableAdapter> indexes,
final boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
ProgressIndicator progress,
- @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ int maxColumnsToMerge
) throws IOException
{
FileUtils.deleteDirectory(outDir);
org.apache.commons.io.FileUtils.forceMkdir(outDir);
+ List<File> tempDirs = new ArrayList<>();
+
+ if (maxColumnsToMerge == IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE) {
+ return merge(
+ indexes,
+ rollup,
+ metricAggs,
+ outDir,
+ indexSpec,
+ progress,
+ segmentWriteOutMediumFactory
+ );
+ }
+
+ List<List<IndexableAdapter>> currentPhases = getMergePhases(indexes,
maxColumnsToMerge);
+ List<File> currentOutputs = new ArrayList<>();
+
+ log.debug("base outDir: " + outDir);
+
+ try {
+ while (true) {
+ for (List<IndexableAdapter> phase : currentPhases) {
+ File phaseOutDir;
+ if (currentPhases.size() == 1) {
+ // use the given outDir on the final merge phase
+ phaseOutDir = outDir;
+ } else {
+ phaseOutDir = FileUtils.createTempDir();
+ tempDirs.add(phaseOutDir);
+ }
+ log.debug("phase outDir: " + phaseOutDir);
+
+ File phaseOutput = merge(
+ phase,
+ rollup,
+ metricAggs,
+ phaseOutDir,
+ indexSpec,
+ progress,
+ segmentWriteOutMediumFactory
+ );
+ currentOutputs.add(phaseOutput);
+ }
+ if (currentOutputs.size() == 1) {
+ // we're done, we made a single File output
+ return currentOutputs.get(0);
+ } else {
+ // convert Files to QueryableIndexIndexableAdapter and do another
merge phase
+ List<IndexableAdapter> qIndexAdapters = new ArrayList<>();
+ for (File outputFile : currentOutputs) {
+ QueryableIndex qIndex = indexIO.loadIndex(outputFile, true);
+ qIndexAdapters.add(new QueryableIndexIndexableAdapter(qIndex));
+ }
+ currentPhases = getMergePhases(qIndexAdapters, maxColumnsToMerge);
+ currentOutputs = new ArrayList<>();
+ }
+ }
+ }
+ finally {
+ for (File tempDir : tempDirs) {
+ if (tempDir.exists()) {
+ try {
+ FileUtils.deleteDirectory(tempDir);
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to remove directory[%s]", tempDir);
+ }
+ }
+ }
+ }
+ }
+
+ private List<List<IndexableAdapter>> getMergePhases(List<IndexableAdapter>
indexes, int maxColumnsToMerge)
+ {
+ List<List<IndexableAdapter>> toMerge = new ArrayList<>();
+ // always merge at least two segments regardless of column limit
+ if (indexes.size() <= 2) {
+ if (getIndexColumnCount(indexes) > maxColumnsToMerge) {
+ log.warn("index pair has more columns than maxColumnsToMerge [%d].",
maxColumnsToMerge);
+ }
+ toMerge.add(indexes);
+ } else {
+ List<IndexableAdapter> currentPhase = new ArrayList<>();
+ int currentColumnCount = 0;
+ for (IndexableAdapter index : indexes) {
+ int indexColumnCount = getIndexColumnCount(index);
+ if (indexColumnCount > maxColumnsToMerge) {
+ log.warn("index has more columns [%d] than maxColumnsToMerge [%d]!",
indexColumnCount, maxColumnsToMerge);
Review comment:
Should this be a warn since this can happen and is a expected / ok
thing? The warning may be misleading as there is nothing to fix / change
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]