This is an automated email from the ASF dual-hosted git repository.
adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 269e035e768 Add validation for reindex with realtime sources (#16390)
269e035e768 is described below
commit 269e035e76850e18b125dd8cce77ca90c7f93730
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue May 7 10:32:15 2024 +0530
Add validation for reindex with realtime sources (#16390)
Add validation for reindex with realtime sources.
With the addition of concurrent compaction, it is possible to ingest data
while querying from realtime sources with MSQ into the same datasource. This
could potentially lead to issues if the interval that is ingested into is
replaced by an MSQ job, which has queried only some of the data from the
realtime task.
This PR adds validation to check that the datasource being ingested into is
not being queried from, if the query includes realtime sources.
---
docs/multi-stage-query/reference.md | 2 +-
.../org/apache/druid/msq/exec/ControllerImpl.java | 3 ++
.../druid/msq/indexing/MSQControllerTask.java | 2 +-
.../apache/druid/msq/sql/MSQTaskQueryMaker.java | 2 ++
.../druid/msq/util/MSQTaskQueryMakerUtils.java | 24 ++++++++++++++++
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 32 ++++++++++++++++++++++
6 files changed, 63 insertions(+), 2 deletions(-)
diff --git a/docs/multi-stage-query/reference.md
b/docs/multi-stage-query/reference.md
index 20d0ef2f7e6..aeb4305f31e 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -405,7 +405,7 @@ The following table lists the context parameters for the
MSQ task engine:
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on
fault tolerance mode or not. Failed workers are retried based on
[Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly
set to false. | `false` |
| `selectDestination` | SELECT<br /><br /> Controls where the final result of
the select query is written. <br />Use `taskReport`(the default) to write
select results to the task report. <b> This is not scalable since task reports
size explodes for large results </b> <br/>Use `durableStorage` to write results
to durable storage location. <b>For large results sets, its recommended to use
`durableStorage` </b>. To configure durable storage see
[`this`](#durable-storage) section. | `taskReport` |
| `waitUntilSegmentsLoad` | INSERT, REPLACE<br /><br /> If set, the ingest
query waits for the generated segment to be loaded before exiting, else the
ingest query exits without waiting. The task and live reports contain the
information about the status of loading segments if this flag is set. This will
ensure that any future queries made after the ingestion exits will include
results from the ingestion. The drawback is that the controller task will stall
till the segments are loaded. | [...]
-| `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the
sources, which will be queried for results in addition to the segments present
on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only
non-realtime (published and used) segments will be downloaded from deep
storage. If this value is `REALTIME`, results will also be included from
realtime tasks. | `NONE` |
+| `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the
sources, which will be queried for results in addition to the segments present
on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only
non-realtime (published and used) segments will be downloaded from deep
storage. If this value is `REALTIME`, results will also be included from
realtime tasks. `REALTIME` cannot be used while writing data into the same
datasource it is read from.| `NONE` |
| `rowsPerPage` | SELECT<br /><br />The number of rows per page to target. The
actual number of rows per page may be somewhat higher or lower than this
number. In most cases, use the default.<br /> This property comes into effect
only when `selectDestination` is set to `durableStorage` | 100000 |
| `skipTypeVerification` | INSERT or REPLACE<br /><br />During query
validation, Druid validates that [string arrays](../querying/arrays.md) and
[multi-value dimensions](../querying/multi-value-dimensions.md) are not mixed
in the same column. If you are intentionally migrating from one to the other,
use this context parameter to disable type validation.<br /><br />Provide the
column list as comma-separated values or as a JSON array in string form.| empty
list |
| `failOnEmptyInsert` | INSERT or REPLACE<br /><br /> When set to false (the
default), an INSERT query generating no output rows will be no-op, and a
REPLACE query generating no output rows will delete all data that matches the
OVERWRITE clause. When set to true, an ingest query generating no output rows
will throw an `InsertCannotBeEmpty` fault. | `false` |
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index b10fbe76ecf..db7c6838ba7 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -171,6 +171,7 @@ import org.apache.druid.msq.util.ArrayIngestMode;
import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.msq.util.IntervalUtils;
import org.apache.druid.msq.util.MSQFutureUtils;
+import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.msq.util.PassthroughAggregatorFactory;
import org.apache.druid.query.Query;
@@ -1691,6 +1692,8 @@ public class ControllerImpl implements Controller
throw new ISE("Column names are not unique: [%s]",
columnMappings.getOutputColumnNames());
}
+ MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
+
if (columnMappings.hasOutputColumn(ColumnHolder.TIME_COLUMN_NAME)) {
// We know there's a single time column, because we've checked
columnMappings.hasUniqueOutputColumnNames().
final int timeColumn =
columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME).getInt(0);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index e645e0e62cd..2a435215144 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -315,7 +315,7 @@ public class MSQControllerTask extends AbstractTask
implements ClientTaskQuery,
}
/**
- * Returns true if the task reads from the same table as the destionation.
In this case, we would prefer to fail
+ * Returns true if the task reads from the same table as the destination. In
this case, we would prefer to fail
* instead of reading any unused segments to ensure that old data is not
read.
*/
public static boolean isReplaceInputDataSourceTask(MSQSpec querySpec)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 4d776260798..8cc34547f99 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -281,6 +281,8 @@ public class MSQTaskQueryMaker implements QueryMaker
.tuningConfig(new MSQTuningConfig(maxNumWorkers,
maxRowsInMemory, rowsPerSegment, indexSpec))
.build();
+ MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
+
final MSQControllerTask controllerTask = new MSQControllerTask(
taskId,
querySpec.withOverriddenContext(nativeQueryContext),
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
index 3a2de0da96a..ae1558babf5 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
@@ -20,8 +20,13 @@
package org.apache.druid.msq.util;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.exec.SegmentSource;
+import org.apache.druid.msq.indexing.MSQControllerTask;
+import org.apache.druid.msq.indexing.MSQSpec;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.segment.column.ColumnHolder;
import java.util.Collection;
@@ -90,4 +95,23 @@ public class MSQTaskQueryMakerUtils
throw new IAE("Segment sort order must begin with column [%s]",
ColumnHolder.TIME_COLUMN_NAME);
}
}
+
+ /**
+ * Validates that a query does not read from a datasource that it is
ingesting data into, if realtime segments are
+ * being queried.
+ */
+ public static void validateRealtimeReindex(final MSQSpec querySpec)
+ {
+ final SegmentSource segmentSources =
MultiStageQueryContext.getSegmentSources(querySpec.getQuery().context());
+ if (MSQControllerTask.isReplaceInputDataSourceTask(querySpec) &&
SegmentSource.REALTIME.equals(segmentSources)) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build("Cannot ingest into datasource[%s] since it
is also being queried from, with "
+ + "REALTIME segments included. Ingest to a
different datasource, or disable querying "
+ + "of realtime segments by modifying [%s] in
the query context.",
+ ((DataSourceMSQDestination)
querySpec.getDestination()).getDataSource(),
+
MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE
+ );
+ }
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 227a9656a14..174e09243ae 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -29,6 +29,8 @@ import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -57,6 +59,7 @@ import
org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatcher;
@@ -1840,6 +1843,35 @@ public class MSQReplaceTest extends MSQTestBase
.verifyResults();
}
+ @Test
+ void testRealtimeQueryWithReindexShouldThrowException()
+ {
+ Map<String, Object> context = ImmutableMap.<String, Object>builder()
+ .putAll(DEFAULT_MSQ_CONTEXT)
+
.put(MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE,
SegmentSource.REALTIME.name())
+ .build();
+
+ testIngestQuery().setSql(
+ "REPLACE INTO foo"
+ + " OVERWRITE ALL"
+ + " SELECT *"
+ + " FROM foo"
+ + " PARTITIONED BY DAY")
+ .setQueryContext(context)
+ .setExpectedValidationErrorMatcher(
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "general"
+ ).expectMessageContains(
+ "Cannot ingest into datasource[foo] since it is
also being queried from, with REALTIME "
+ + "segments included. Ingest to a different
datasource, or disable querying of realtime "
+ + "segments by modifying [includeSegmentSource]
in the query context.")
+ )
+ .verifyPlanningErrors();
+
+ }
+
@Nonnull
private Set<SegmentId> expectedFooSegments()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]