capistrant commented on code in PR #18950:
URL: https://github.com/apache/druid/pull/18950#discussion_r2742281320
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -246,6 +245,15 @@ public CompactionTask(
this.compactionRunner = compactionRunner == null
? new
NativeCompactionRunner(segmentCacheManagerFactory)
: compactionRunner;
+ if (this.compactionRunner.requireAlignedInterval() &&
this.ioConfig.isAllowNonAlignedInterval()) {
+ throw new IAE(
+ "Invalid config: allowNonAlignedInterval is not allowed by
runner[%s]",
+ this.compactionRunner.getClass()
+ );
+ }
+ if (this.compactionRunner.forceDropExisting() &&
!this.ioConfig.isDropExisting()) {
+ throw new IAE("Invalid config: runner[%s] must run with dropExisting",
this.compactionRunner.getClass());
Review Comment:
nit
```suggestion
throw new IAE("Invalid config: runner[%s] must run with
dropExisting=true", this.compactionRunner.getClass());
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java:
##########
@@ -42,6 +42,28 @@ public interface CompactionRunner
{
String TYPE_PROPERTY = "type";
+ /**
+ * Returns whether this runner requires aligned intervals for compaction.
+ * When true, the compaction task will throw an error if the IOConfig has
allowNonAlignedInterval enabled.
+ *
+ * @return true if aligned intervals are required by this runner, false
otherwise. Default is true.
+ */
+ default boolean requireAlignedInterval()
+ {
+ return true;
+ }
+
+ /**
+ * Returns whether this runner requires dropExisting to be enabled for
compaction.
+ * When true, the compaction task will fail if dropExisting is not set to
true in the IOConfig.
+ *
+ * @return true if dropExisting must be enabled, false otherwise. Default is
true.
+ */
+ default boolean forceDropExisting()
+ {
+ return true;
Review Comment:
Was this a nasty bug that MSQ compaction didn't enforce dropExisting being
true in the past? cuz like I'm sure there are people using it in production and
not setting their io config explicitly.. so I guess their tasks would now fail
until they set the config? I always thought dropExisting was a native only
config since MSQ always creates tombstones regardless of this value. But I
guess if `REPLACE_LEGACY` is used to make any other decisions behind the scenes
it could be doing something unexpected?
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java:
##########
@@ -95,12 +100,30 @@ public static DataSchema makeDataSchemaForIngestion(
destination.getDimensionSchemas()
);
+ final TransformSpec transformSpec;
+ if (query.getFilter() != null) {
+ List<Transform> transforms = new ArrayList<>();
+ for (VirtualColumn vc : query.getVirtualColumns().getVirtualColumns()) {
Review Comment:
Since CompactionTransformSpec doesn't support transforms, these wouldn't
actually end up in lastCompactionState for a data segment even if they existed.
I don't know if the answer is to consider supporting transforms for
CompactionState so this would flow through or to not bother with this code
here, cuz it is confusing to extract the virtual columns and add them to the
data schema only for them to get ignored.
I actually was toying with the idea of modifying CompactionTransformSpec in
https://github.com/apache/druid/pull/18939 to support virtual columns being
passed through for cascading deletion filters that are using virtual columns to
extract information from nested fields. Seeing this code and seeing how I was
going to accomplish this, I wonder if it is time to support the full
TransformSpec in compaction instead of just filter? That is probably out of
scope for this PR.
I guess right now we have to decide:
1. leave this code even though it ends up ignored (at least for now)
2. remove it and just extract the filter to pass to the DataSchema builder
what do you think?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -440,8 +448,7 @@ public int getPriority()
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
- final List<DataSegment> segments =
segmentProvider.findSegments(taskActionClient);
- return determineLockGranularityAndTryLockWithSegments(taskActionClient,
segments, segmentProvider::checkSegments);
+ return determineLockGranularityAndTryLock(taskActionClient,
List.of(segmentProvider.interval));
Review Comment:
Is this what you refer to in `Native runner's index task now locks intervals
from CompactionTask instead of existing segments' intervals`? Isn't this code
on the path for both native and MSQ compaction? that is at least how I
interpret it. Not that I think that makes this wrong, but I'm trying to make
sure I understand the scope of the change.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -440,8 +448,7 @@ public int getPriority()
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
- final List<DataSegment> segments =
segmentProvider.findSegments(taskActionClient);
- return determineLockGranularityAndTryLockWithSegments(taskActionClient,
segments, segmentProvider::checkSegments);
+ return determineLockGranularityAndTryLock(taskActionClient,
List.of(segmentProvider.interval));
Review Comment:
an aside, I think this would make
`determineLockGranularityAndTryLockWithSegments` unused now. do we want to
leave that around for future use or remove it?
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java:
##########
@@ -95,12 +100,30 @@ public static DataSchema makeDataSchemaForIngestion(
destination.getDimensionSchemas()
);
+ final TransformSpec transformSpec;
+ if (query.getFilter() != null) {
+ List<Transform> transforms = new ArrayList<>();
+ for (VirtualColumn vc : query.getVirtualColumns().getVirtualColumns()) {
Review Comment:
I thought about it more while I was reviewing the rest of the PR and think
that it makes sense to keep the code. This code need not care how it is
consumed, creating the accurate dataschema feels right. If the transforms end
up getting ignored later for lastCompactionState, so be it.
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java:
##########
@@ -95,12 +100,30 @@ public static DataSchema makeDataSchemaForIngestion(
destination.getDimensionSchemas()
);
+ final TransformSpec transformSpec;
+ if (query.getFilter() != null) {
+ List<Transform> transforms = new ArrayList<>();
+ for (VirtualColumn vc : query.getVirtualColumns().getVirtualColumns()) {
+ if (vc instanceof ExpressionVirtualColumn) {
+ transforms.add(new ExpressionTransform(vc.getOutputName(),
((ExpressionVirtualColumn) vc).getExpression(), null));
+ }
+ }
+ transformSpec = new TransformSpec(query.getFilter(), transforms);
+ } else {
+ transformSpec = null;
+ }
return DataSchema.builder()
.withDataSource(destination.getDataSource())
.withTimestamp(new
TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null))
.withDimensions(dimensionsAndAggregators.lhs)
.withAggregators(dimensionsAndAggregators.rhs.toArray(new
AggregatorFactory[0]))
- .withGranularity(makeGranularitySpecForIngestion(query,
querySpec.getColumnMappings(), isRollupQuery, jsonMapper))
+ .withGranularity(makeGranularitySpecForIngestion(
+ query,
+ querySpec.getColumnMappings(),
+ isRollupQuery,
+ jsonMapper
+ ))
+ .withTransform(transformSpec)
Review Comment:
crazy that this has gone so long unnoticed by anyone in the community. Maybe
compaction filters are rare (or I guess MSQ compaction adoption may be low).
either way, good find
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -246,6 +245,15 @@ public CompactionTask(
this.compactionRunner = compactionRunner == null
? new
NativeCompactionRunner(segmentCacheManagerFactory)
: compactionRunner;
+ if (this.compactionRunner.requireAlignedInterval() &&
this.ioConfig.isAllowNonAlignedInterval()) {
+ throw new IAE(
+ "Invalid config: allowNonAlignedInterval is not allowed by
runner[%s]",
Review Comment:
nit
```suggestion
"Invalid config: allowNonAlignedInterval=true is not allowed by
runner[%s]",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]