gianm commented on code in PR #15495:
URL: https://github.com/apache/druid/pull/15495#discussion_r1425741792
##########
docs/multi-stage-query/reference.md:
##########
@@ -250,7 +250,7 @@ The following table lists the context parameters for the
MSQ task engine:
| `waitUntilSegmentsLoad` | INSERT, REPLACE<br /><br /> If set, the ingest
query waits for the generated segment to be loaded before exiting, else the
ingest query exits without waiting. The task and live reports contain the
information about the status of loading segments if this flag is set. This will
ensure that any future queries made after the ingestion exits will include
results from the ingestion. The drawback is that the controller task will stall
till the segments are loaded. | `false` |
| `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the
sources, which will be queried for results in addition to the segments present
on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only
non-realtime (published and used) segments will be downloaded from deep
storage. If this value is `REALTIME`, results will also be included from
realtime tasks. | `NONE` |
| `rowsPerPage` | SELECT<br /><br />The number of rows per page to target. The
actual number of rows per page may be somewhat higher or lower than this
number. In most cases, use the default.<br /> This property comes into effect
only when `selectDestination` is set to `durableStorage` | 100000 |
-
+| `failOnEmptyInsert` | INSERT or REPLACE<br /><br /> When set to true, an
ingest query generating no output rows will throw an `InsertCannotBeEmpty`
fault. When set to false (the default), an INSERT query generating no output
rows will be no-op, and a REPLACE query generating no output rows will delete
all data that matches the OVERWRITE clause. | `false` |
Review Comment:
Nice, clear wording. nit, I think these generally work best if the default
is first, which emphasizes the out-of-box behavior.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -957,7 +961,8 @@ private List<SegmentIdWithShardSpec>
generateSegmentIdsWithShardSpecs(
final RowSignature signature,
final ClusterBy clusterBy,
final ClusterByPartitions partitionBoundaries,
- final boolean mayHaveMultiValuedClusterByFields
+ final boolean mayHaveMultiValuedClusterByFields,
+ final Boolean isStageOutputEmpty
Review Comment:
I guess this is capital-B because `null` means unknown— in that case please
annotate it with `@Nullable` and explain in the javadoc what `null` means.
(It's rarely obvious to people without studying the code, and we'd like that to
not be necessary whenever possible)
& do the same thing in the methods that this one calls out to.
Or, the other option is to define an enum, like `Capable`. Probably
shouldn't use `Capable` since it's not really the right semantics, but it's a
technique that aids in understanding.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -2689,20 +2711,15 @@ private void startStages() throws IOException,
InterruptedException
}
final StageId shuffleStageId = new StageId(queryDef.getQueryId(),
shuffleStageNumber);
- final boolean isTimeBucketed =
isTimeBucketedIngestion(task.getQuerySpec());
- final ClusterByPartitions partitionBoundaries =
- queryKernel.getResultPartitionBoundariesForStage(shuffleStageId);
+ final Boolean isShuffleStageOutputEmpty =
queryKernel.isStageOutputEmpty(shuffleStageId);
- // We require some data to be inserted in case it is partitioned by
anything other than all and we are
- // inserting everything into a single bucket. This can be handled
more gracefully instead of throwing an exception
- // Note: This can also be the case when we have limit queries but
validation in Broker SQL layer prevents such
- // queries
- if (isTimeBucketed &&
partitionBoundaries.equals(ClusterByPartitions.oneUniversalPartition())) {
+ if (!isFailOnEmptyInsertEnabled &&
Boolean.TRUE.equals(isShuffleStageOutputEmpty)) {
Review Comment:
I think this might end up being unknown in some situations where we're using
`PARTITIONED BY ALL` or `LIMIT`. I don't see tests for those, so please add
them, and then let's consider what that tells us.
I suspect that there may be some cases where the "is output empty" ends up
being unknown when there is in fact no data, meaning the `failOnEmptyInsert:
true` won't always behave like we expect it to. (Sometimes the empty insert
will go through even with `failOnEmptyInsert: true`)
##########
docs/multi-stage-query/reference.md:
##########
@@ -429,7 +429,7 @@ The following table describes error codes you may encounter
in the `multiStageQu
| <a name="error_ColumnNameRestricted">`ColumnNameRestricted`</a> | The query
uses a restricted column name. | `columnName`: The restricted column name. |
| <a name="error_ColumnTypeNotSupported">`ColumnTypeNotSupported`</a> | The
column type is not supported. This can be because:<br /> <br /><ul><li>Support
for writing or reading from a particular column type is not
supported.</li><li>The query attempted to use a column type that is not
supported by the frame format. This occurs with ARRAY types, which are not yet
implemented for frames.</li></ul> | `columnName`: The column name with an
unsupported type.<br /> <br />`columnType`: The unknown column type. |
| <a
name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> |
The controller task could not allocate a new segment ID due to conflict with
existing segments or pending segments. Common reasons for such conflicts:<br />
<br /><ul><li>Attempting to mix different granularities in the same intervals
of the same datasource.</li><li>Prior ingestions that used non-extendable shard
specs.</li></ul> <br /> <br /> Use REPLACE to overwrite the existing data or if
the error contains the `allocatedInterval` then alternatively rerun the INSERT
job with the mentioned granularity to append to existing data. Note that it
might not always be possible to append to the existing data using INSERT and
can only be done if `allocatedInterval` is present. | `dataSource`<br /> <br
/>`interval`: The interval for the attempted new segment allocation. <br /> <br
/> `allocatedInterval`: The incorrect interval allocated by the overlord. It
can be null |
-| <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or
REPLACE query did not generate any output rows in a situation where output rows
are required for success. This can happen for INSERT or REPLACE queries with
`PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. |
`dataSource` |
+| <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or
REPLACE query did not generate any output rows in a situation where output rows
are required for success - i.e., when `failOnEmptyInsert` query context is set
to true. `failOnEmptyInsert` defaults to false, so an INSERT query generating
no output rows will be no-op, and a REPLACE query generating no output rows
will delete all data that matches the OVERWRITE clause. | `dataSource` |
Review Comment:
Let's get rid of the phrase "in a situation where output rows are required
for success" — they're only there because we felt like describing the actual
logic would be too weird and idiosyncratic. But with harmonizing things using
this new parameter, the logic isn't weird anymore and we can remove the vague
wording.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -957,7 +961,8 @@ private List<SegmentIdWithShardSpec>
generateSegmentIdsWithShardSpecs(
final RowSignature signature,
final ClusterBy clusterBy,
final ClusterByPartitions partitionBoundaries,
- final boolean mayHaveMultiValuedClusterByFields
+ final boolean mayHaveMultiValuedClusterByFields,
+ final Boolean isStageOutputEmpty
Review Comment:
I guess this is capital-B because `null` means unknown— in that case please
annotate it with `@Nullable` and explain in the javadoc what `null` means.
(It's rarely obvious to people without studying the code, and we'd like that to
not be necessary whenever possible)
& do the same thing in the methods that this one calls out to.
Or, the other option is to define an enum, like `Capable`. Probably
shouldn't use `Capable` since it's not really the right semantics, but it's a
technique that aids in understanding.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotBeEmptyFault.java:
##########
@@ -38,7 +38,8 @@ public InsertCannotBeEmptyFault(
@JsonProperty("dataSource") final String dataSource
)
{
- super(CODE, "No rows to insert for dataSource [%s]", dataSource);
+ super(CODE, "No rows to insert for dataSource[%s]. Set
failOnEmptyInsert=false"
Review Comment:
typically we use colons like `failOnEmptyInsert: false` since it's more
JSON-y, and the stuff typically gets specified in JSON
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -985,15 +993,20 @@ private List<SegmentIdWithShardSpec>
generateSegmentIdsWithShardSpecsForAppend(
final DataSourceMSQDestination destination,
final ClusterByPartitions partitionBoundaries,
final RowKeyReader keyReader,
- final TaskLockType taskLockType
+ final TaskLockType taskLockType,
+ final Boolean isStageOutputEmpty
) throws IOException
{
+ final List<SegmentIdWithShardSpec> retVal = new
ArrayList<>(partitionBoundaries.size());
+
+ if (Boolean.TRUE.equals(isStageOutputEmpty)) {
+ return retVal;
Review Comment:
nit: might as well return `Collections.emptyList()` prior to creating
`retVal`
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java:
##########
@@ -114,6 +114,10 @@ public class MultiStageQueryContext
public static final String CTX_FAULT_TOLERANCE = "faultTolerance";
public static final boolean DEFAULT_FAULT_TOLERANCE = false;
+
+ public static final String CTX_FAIL_ON_EMPTY_INSERT = "failOnEmptyInsert";
+ public static final boolean DEFAULT_FAIL_ON_EMPTY_INSERT = true;
Review Comment:
I thought `false` was supposed to be the default?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1062,9 +1075,14 @@ private List<SegmentIdWithShardSpec>
generateSegmentIdsWithShardSpecsForReplace(
final RowSignature signature,
final ClusterBy clusterBy,
final ClusterByPartitions partitionBoundaries,
- final boolean mayHaveMultiValuedClusterByFields
+ final boolean mayHaveMultiValuedClusterByFields,
+ final Boolean isStageOutputEmpty
) throws IOException
{
+ if (Boolean.TRUE.equals(isStageOutputEmpty)) {
+ return new ArrayList<>(partitionBoundaries.size());
Review Comment:
might as well be `Collections.emptyList()`
##########
docs/multi-stage-query/reference.md:
##########
@@ -429,7 +429,7 @@ The following table describes error codes you may encounter
in the `multiStageQu
| <a name="error_ColumnNameRestricted">`ColumnNameRestricted`</a> | The query
uses a restricted column name. | `columnName`: The restricted column name. |
| <a name="error_ColumnTypeNotSupported">`ColumnTypeNotSupported`</a> | The
column type is not supported. This can be because:<br /> <br /><ul><li>Support
for writing or reading from a particular column type is not
supported.</li><li>The query attempted to use a column type that is not
supported by the frame format. This occurs with ARRAY types, which are not yet
implemented for frames.</li></ul> | `columnName`: The column name with an
unsupported type.<br /> <br />`columnType`: The unknown column type. |
| <a
name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> |
The controller task could not allocate a new segment ID due to conflict with
existing segments or pending segments. Common reasons for such conflicts:<br />
<br /><ul><li>Attempting to mix different granularities in the same intervals
of the same datasource.</li><li>Prior ingestions that used non-extendable shard
specs.</li></ul> <br /> <br /> Use REPLACE to overwrite the existing data or if
the error contains the `allocatedInterval` then alternatively rerun the INSERT
job with the mentioned granularity to append to existing data. Note that it
might not always be possible to append to the existing data using INSERT and
can only be done if `allocatedInterval` is present. | `dataSource`<br /> <br
/>`interval`: The interval for the attempted new segment allocation. <br /> <br
/> `allocatedInterval`: The incorrect interval allocated by the overlord. It
can be null |
-| <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or
REPLACE query did not generate any output rows in a situation where output rows
are required for success. This can happen for INSERT or REPLACE queries with
`PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. |
`dataSource` |
+| <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or
REPLACE query did not generate any output rows in a situation where output rows
are required for success - i.e., when `failOnEmptyInsert` query context is set
to true. `failOnEmptyInsert` defaults to false, so an INSERT query generating
no output rows will be no-op, and a REPLACE query generating no output rows
will delete all data that matches the OVERWRITE clause. | `dataSource` |
Review Comment:
Let's get rid of the phrase "in a situation where output rows are required
for success" — they're only there because we felt like describing the actual
logic would be too weird and idiosyncratic. But with harmonizing things using
this new parameter, the logic isn't weird anymore and we can remove the vague
wording.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]