This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/30.0.0 by this push:
new cb1bc70d3f1 [Backport] Add validation for reindex with realtime
sources (#16410)
cb1bc70d3f1 is described below
commit cb1bc70d3f1f204c2f1711692477ddc2829885ee
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu May 9 17:51:11 2024 +0530
[Backport] Add validation for reindex with realtime sources (#16410)
* Add validation for reindex with realtime sources (#16390)
Add validation for reindex with realtime sources.
With the addition of concurrent compaction, it is possible to ingest data
while querying from realtime sources with MSQ into the same datasource. This
could potentially lead to issues if the interval that is ingested into is
replaced by an MSQ job, which has queried only some of the data from the
realtime task.
This PR adds validation to check that the datasource being ingested into is
not being queried from, if the query includes realtime sources.
* Fix conflicts
---
docs/multi-stage-query/reference.md | 2 +-
.../org/apache/druid/msq/exec/ControllerImpl.java | 5 +++-
.../druid/msq/indexing/MSQControllerTask.java | 17 ++++++------
.../apache/druid/msq/sql/MSQTaskQueryMaker.java | 2 ++
.../druid/msq/util/MSQTaskQueryMakerUtils.java | 24 ++++++++++++++++
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 32 ++++++++++++++++++++++
6 files changed, 72 insertions(+), 10 deletions(-)
diff --git a/docs/multi-stage-query/reference.md
b/docs/multi-stage-query/reference.md
index 20d0ef2f7e6..aeb4305f31e 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -405,7 +405,7 @@ The following table lists the context parameters for the
MSQ task engine:
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on
fault tolerance mode or not. Failed workers are retried based on
[Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly
set to false. | `false` |
| `selectDestination` | SELECT<br /><br /> Controls where the final result of
the select query is written. <br />Use `taskReport`(the default) to write
select results to the task report. <b> This is not scalable since task reports
size explodes for large results </b> <br/>Use `durableStorage` to write results
to durable storage location. <b>For large results sets, its recommended to use
`durableStorage` </b>. To configure durable storage see
[`this`](#durable-storage) section. | `taskReport` |
| `waitUntilSegmentsLoad` | INSERT, REPLACE<br /><br /> If set, the ingest
query waits for the generated segment to be loaded before exiting, else the
ingest query exits without waiting. The task and live reports contain the
information about the status of loading segments if this flag is set. This will
ensure that any future queries made after the ingestion exits will include
results from the ingestion. The drawback is that the controller task will stall
till the segments are loaded. | [...]
-| `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the
sources, which will be queried for results in addition to the segments present
on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only
non-realtime (published and used) segments will be downloaded from deep
storage. If this value is `REALTIME`, results will also be included from
realtime tasks. | `NONE` |
+| `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the
sources, which will be queried for results in addition to the segments present
on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only
non-realtime (published and used) segments will be downloaded from deep
storage. If this value is `REALTIME`, results will also be included from
realtime tasks. `REALTIME` cannot be used while writing data into the same
datasource it is read from.| `NONE` |
| `rowsPerPage` | SELECT<br /><br />The number of rows per page to target. The
actual number of rows per page may be somewhat higher or lower than this
number. In most cases, use the default.<br /> This property comes into effect
only when `selectDestination` is set to `durableStorage` | 100000 |
| `skipTypeVerification` | INSERT or REPLACE<br /><br />During query
validation, Druid validates that [string arrays](../querying/arrays.md) and
[multi-value dimensions](../querying/multi-value-dimensions.md) are not mixed
in the same column. If you are intentionally migrating from one to the other,
use this context parameter to disable type validation.<br /><br />Provide the
column list as comma-separated values or as a JSON array in string form.| empty
list |
| `failOnEmptyInsert` | INSERT or REPLACE<br /><br /> When set to false (the
default), an INSERT query generating no output rows will be no-op, and a
REPLACE query generating no output rows will delete all data that matches the
OVERWRITE clause. When set to true, an ingest query generating no output rows
will throw an `InsertCannotBeEmpty` fault. | `false` |
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 682e2b484e4..83e53ee60ec 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -189,6 +189,7 @@ import org.apache.druid.msq.util.ArrayIngestMode;
import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.msq.util.IntervalUtils;
import org.apache.druid.msq.util.MSQFutureUtils;
+import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.msq.util.PassthroughAggregatorFactory;
import org.apache.druid.msq.util.SqlStatementResourceHelper;
@@ -716,7 +717,7 @@ public class ControllerImpl implements Controller
taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_IS_REINDEX,
- MSQControllerTask.isReplaceInputDataSourceTask(task)
+ MSQControllerTask.isReplaceInputDataSourceTask(task.getQuerySpec())
);
// propagate the controller's tags to the worker task for enhanced metrics
reporting
@@ -1932,6 +1933,8 @@ public class ControllerImpl implements Controller
throw new ISE("Column names are not unique: [%s]",
columnMappings.getOutputColumnNames());
}
+ MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
+
if (columnMappings.hasOutputColumn(ColumnHolder.TIME_COLUMN_NAME)) {
// We know there's a single time column, because we've checked
columnMappings.hasUniqueOutputColumnNames().
final int timeColumn =
columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME).getInt(0);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index 7bb57c5e1c2..12bd3d9d119 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -300,17 +300,18 @@ public class MSQControllerTask extends AbstractTask
implements ClientTaskQuery,
}
/**
- * Returns true if the task reads from the same table as the destionation.
In this case, we would prefer to fail
+ * Returns true if the task reads from the same table as the destination. In
this case, we would prefer to fail
* instead of reading any unused segments to ensure that old data is not
read.
*/
- public static boolean isReplaceInputDataSourceTask(MSQControllerTask task)
+ public static boolean isReplaceInputDataSourceTask(MSQSpec querySpec)
{
- return task.getQuerySpec()
- .getQuery()
- .getDataSource()
- .getTableNames()
- .stream()
- .anyMatch(datasouce -> task.getDataSource().equals(datasouce));
+ if (isIngestion(querySpec)) {
+ final String targetDataSource = ((DataSourceMSQDestination)
querySpec.getDestination()).getDataSource();
+ final Set<String> sourceTableNames =
querySpec.getQuery().getDataSource().getTableNames();
+ return sourceTableNames.contains(targetDataSource);
+ } else {
+ return false;
+ }
}
public static boolean writeResultsToDurableStorage(final MSQSpec querySpec)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 4d776260798..8cc34547f99 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -281,6 +281,8 @@ public class MSQTaskQueryMaker implements QueryMaker
.tuningConfig(new MSQTuningConfig(maxNumWorkers,
maxRowsInMemory, rowsPerSegment, indexSpec))
.build();
+ MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
+
final MSQControllerTask controllerTask = new MSQControllerTask(
taskId,
querySpec.withOverriddenContext(nativeQueryContext),
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
index 3a2de0da96a..ae1558babf5 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
@@ -20,8 +20,13 @@
package org.apache.druid.msq.util;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.exec.SegmentSource;
+import org.apache.druid.msq.indexing.MSQControllerTask;
+import org.apache.druid.msq.indexing.MSQSpec;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.segment.column.ColumnHolder;
import java.util.Collection;
@@ -90,4 +95,23 @@ public class MSQTaskQueryMakerUtils
throw new IAE("Segment sort order must begin with column [%s]",
ColumnHolder.TIME_COLUMN_NAME);
}
}
+
+ /**
+ * Validates that a query does not read from a datasource that it is
ingesting data into, if realtime segments are
+ * being queried.
+ */
+ public static void validateRealtimeReindex(final MSQSpec querySpec)
+ {
+ final SegmentSource segmentSources =
MultiStageQueryContext.getSegmentSources(querySpec.getQuery().context());
+ if (MSQControllerTask.isReplaceInputDataSourceTask(querySpec) &&
SegmentSource.REALTIME.equals(segmentSources)) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build("Cannot ingest into datasource[%s] since it
is also being queried from, with "
+ + "REALTIME segments included. Ingest to a
different datasource, or disable querying "
+ + "of realtime segments by modifying [%s] in
the query context.",
+ ((DataSourceMSQDestination)
querySpec.getDestination()).getDataSource(),
+
MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE
+ );
+ }
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 9a4fb98666b..b4e8ef8745e 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -29,6 +29,8 @@ import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -56,6 +58,7 @@ import
org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
@@ -1824,6 +1827,35 @@ public class MSQReplaceTest extends MSQTestBase
.verifyResults();
}
+ @Test
+ void testRealtimeQueryWithReindexShouldThrowException()
+ {
+ Map<String, Object> context = ImmutableMap.<String, Object>builder()
+ .putAll(DEFAULT_MSQ_CONTEXT)
+
.put(MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE,
SegmentSource.REALTIME.name())
+ .build();
+
+ testIngestQuery().setSql(
+ "REPLACE INTO foo"
+ + " OVERWRITE ALL"
+ + " SELECT *"
+ + " FROM foo"
+ + " PARTITIONED BY DAY")
+ .setQueryContext(context)
+ .setExpectedValidationErrorMatcher(
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "general"
+ ).expectMessageContains(
+ "Cannot ingest into datasource[foo] since it is
also being queried from, with REALTIME "
+ + "segments included. Ingest to a different
datasource, or disable querying of realtime "
+ + "segments by modifying [includeSegmentSource]
in the query context.")
+ )
+ .verifyPlanningErrors();
+
+ }
+
@Nonnull
private Set<SegmentId> expectedFooSegments()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]