kfaraz commented on code in PR #18950:
URL: https://github.com/apache/druid/pull/18950#discussion_r2752701091
##########
processing/src/main/java/org/apache/druid/timeline/CompactionState.java:
##########
@@ -79,6 +79,93 @@ public CompactionState(
this.projections = projections;
}
+ public Builder toBuilder()
+ {
+ return new Builder(this);
+ }
+
+ public static class Builder
Review Comment:
Nit: It would be cleaner to have this at the end of this file.
##########
processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularities.java:
##########
@@ -57,6 +57,7 @@ public class Granularities
public static final Granularity FIFTEEN_MINUTE =
GranularityType.FIFTEEN_MINUTE.getDefaultGranularity();
public static final Granularity THIRTY_MINUTE =
GranularityType.THIRTY_MINUTE.getDefaultGranularity();
public static final Granularity HOUR =
GranularityType.HOUR.getDefaultGranularity();
+ public static final Granularity THREE_HOUR =
GranularityType.THREE_HOUR.getDefaultGranularity();
Review Comment:
Why is this new granularity needed?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java:
##########
@@ -42,6 +42,17 @@ public interface CompactionRunner
{
String TYPE_PROPERTY = "type";
+ /**
+ * Returns whether this runner requires aligned intervals for compaction.
+ * When true, the compaction task will throw an error if the IOConfig has
allowNonAlignedInterval enabled.
+ *
+ * @return true if aligned intervals are required by this runner, false
otherwise. Default is true.
+ */
+ default boolean requireAlignedInterval()
+ {
+ return true;
Review Comment:
I don't think we need a default implementation for this method. There are
only 2 concrete classes and we might as well implement this method for both of
them for clarity.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -440,8 +448,7 @@ public int getPriority()
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
- final List<DataSegment> segments =
segmentProvider.findSegments(taskActionClient);
- return determineLockGranularityAndTryLockWithSegments(taskActionClient,
segments, segmentProvider::checkSegments);
+ return determineLockGranularityAndTryLock(taskActionClient,
List.of(segmentProvider.interval));
Review Comment:
I am not sure if it safe to remove this. IIUC, the original logic was to
fetch all the segments that _overlap_ (even if partially) with the
`semgentProvider.interval`, then try to lock the umbrella interval of the
segments. So the final interval locked may be bigger than the original
interval. The idea is that a task will be able to replace only those segments
which are fully contained in the interval that is locked by that task.
@cecemei , could you share some more details on the issue that you
encountered with the MSQ runner and the hour granularity? Maybe there could be
an alternative solution.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -635,12 +640,7 @@ static Map<Interval, DataSchema>
createDataSchemasForIntervals(
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
- JodaUtils.umbrellaInterval(
Review Comment:
Not sure if it is safe to do this, the umbrella interval need not be the
same as `segmentProvider.interval` since there may be segments that have
partial overlap with the `segmentProvider.interval`, so the umbrella interval
would be bigger.
##########
processing/src/main/java/org/apache/druid/timeline/CompactionState.java:
##########
@@ -79,6 +79,93 @@ public CompactionState(
this.projections = projections;
}
+ public Builder toBuilder()
+ {
+ return new Builder(this);
+ }
+
+ public static class Builder
+ {
+ private PartitionsSpec partitionsSpec;
+ private DimensionsSpec dimensionsSpec;
+ private CompactionTransformSpec transformSpec;
+ private IndexSpec indexSpec;
+ private GranularitySpec granularitySpec;
+ private List<AggregatorFactory> metricsSpec;
+ @Nullable
+ private List<AggregateProjectionSpec> projections;
+
+ public Builder()
+ {
+ }
+
+ public Builder(CompactionState compactionState)
Review Comment:
I think this can be private.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -1146,7 +1146,8 @@ private void processProjections(final QueryableIndex
index)
final QueryableIndex projectionIndex = Preconditions.checkNotNull(
index.getProjectionQueryableIndex(schema.getName())
);
- final List<DimensionSchema> columnSchemas =
Lists.newArrayListWithExpectedSize(schema.getGroupingColumns().size());
+ final List<DimensionSchema> columnSchemas =
Lists.newArrayListWithExpectedSize(schema.getGroupingColumns()
+
.size());
Review Comment:
```suggestion
final List<DimensionSchema> columnSchemas =
Lists.newArrayListWithExpectedSize(
schema.getGroupingColumns().size()
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]