This is an automated email from the ASF dual-hosted git repository.

adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 269e035e768 Add validation for reindex with realtime sources (#16390)
269e035e768 is described below

commit 269e035e76850e18b125dd8cce77ca90c7f93730
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue May 7 10:32:15 2024 +0530

    Add validation for reindex with realtime sources (#16390)
    
    Add validation for reindex with realtime sources.
    
    With the addition of concurrent compaction, it is possible to ingest data 
while querying from realtime sources with MSQ into the same datasource. This 
could potentially lead to issues if the interval that is ingested into is 
replaced by an MSQ job, which has queried only some of the data from the 
realtime task.
    
    This PR adds validation to check that the datasource being ingested into is 
not being queried from, if the query includes realtime sources.
---
 docs/multi-stage-query/reference.md                |  2 +-
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  3 ++
 .../druid/msq/indexing/MSQControllerTask.java      |  2 +-
 .../apache/druid/msq/sql/MSQTaskQueryMaker.java    |  2 ++
 .../druid/msq/util/MSQTaskQueryMakerUtils.java     | 24 ++++++++++++++++
 .../org/apache/druid/msq/exec/MSQReplaceTest.java  | 32 ++++++++++++++++++++++
 6 files changed, 63 insertions(+), 2 deletions(-)

diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index 20d0ef2f7e6..aeb4305f31e 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -405,7 +405,7 @@ The following table lists the context parameters for the 
MSQ task engine:
 | `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on 
fault tolerance mode or not. Failed workers are retried based on 
[Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly 
set to false. | `false` |
 | `selectDestination` | SELECT<br /><br /> Controls where the final result of 
the select query is written. <br />Use `taskReport`(the default) to write 
select results to the task report. <b> This is not scalable since task reports 
size explodes for large results </b> <br/>Use `durableStorage` to write results 
to durable storage location. <b>For large results sets, its recommended to use 
`durableStorage` </b>. To configure durable storage see 
[`this`](#durable-storage) section. | `taskReport` |
 | `waitUntilSegmentsLoad` | INSERT, REPLACE<br /><br /> If set, the ingest 
query waits for the generated segment to be loaded before exiting, else the 
ingest query exits without waiting. The task and live reports contain the 
information about the status of loading segments if this flag is set. This will 
ensure that any future queries made after the ingestion exits will include 
results from the ingestion. The drawback is that the controller task will stall 
till the segments are loaded. |  [...]
-| `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the 
sources, which will be queried for results in addition to the segments present 
on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only 
non-realtime (published and used) segments will be downloaded from deep 
storage. If this value is `REALTIME`, results will also be included from 
realtime tasks. | `NONE` |
+| `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the 
sources, which will be queried for results in addition to the segments present 
on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only 
non-realtime (published and used) segments will be downloaded from deep 
storage. If this value is `REALTIME`, results will also be included from 
realtime tasks. `REALTIME` cannot be used while writing data into the same 
datasource it is read from.| `NONE` |
 | `rowsPerPage` | SELECT<br /><br />The number of rows per page to target. The 
actual number of rows per page may be somewhat higher or lower than this 
number. In most cases, use the default.<br /> This property comes into effect 
only when `selectDestination` is set to `durableStorage` | 100000 |
 | `skipTypeVerification` | INSERT or REPLACE<br /><br />During query 
validation, Druid validates that [string arrays](../querying/arrays.md) and 
[multi-value dimensions](../querying/multi-value-dimensions.md) are not mixed 
in the same column. If you are intentionally migrating from one to the other, 
use this context parameter to disable type validation.<br /><br />Provide the 
column list as comma-separated values or as a JSON array in string form.| empty 
list |
 | `failOnEmptyInsert` | INSERT or REPLACE<br /><br /> When set to false (the 
default), an INSERT query generating no output rows will be no-op, and a 
REPLACE query generating no output rows will delete all data that matches the 
OVERWRITE clause.  When set to true, an ingest query generating no output rows 
will throw an `InsertCannotBeEmpty` fault. | `false` |
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index b10fbe76ecf..db7c6838ba7 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -171,6 +171,7 @@ import org.apache.druid.msq.util.ArrayIngestMode;
 import org.apache.druid.msq.util.DimensionSchemaUtils;
 import org.apache.druid.msq.util.IntervalUtils;
 import org.apache.druid.msq.util.MSQFutureUtils;
+import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.msq.util.PassthroughAggregatorFactory;
 import org.apache.druid.query.Query;
@@ -1691,6 +1692,8 @@ public class ControllerImpl implements Controller
         throw new ISE("Column names are not unique: [%s]", 
columnMappings.getOutputColumnNames());
       }
 
+      MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
+
       if (columnMappings.hasOutputColumn(ColumnHolder.TIME_COLUMN_NAME)) {
         // We know there's a single time column, because we've checked 
columnMappings.hasUniqueOutputColumnNames().
         final int timeColumn = 
columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME).getInt(0);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index e645e0e62cd..2a435215144 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -315,7 +315,7 @@ public class MSQControllerTask extends AbstractTask 
implements ClientTaskQuery,
   }
 
   /**
-   * Returns true if the task reads from the same table as the destionation. 
In this case, we would prefer to fail
+   * Returns true if the task reads from the same table as the destination. In 
this case, we would prefer to fail
    * instead of reading any unused segments to ensure that old data is not 
read.
    */
   public static boolean isReplaceInputDataSourceTask(MSQSpec querySpec)
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 4d776260798..8cc34547f99 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -281,6 +281,8 @@ public class MSQTaskQueryMaker implements QueryMaker
                .tuningConfig(new MSQTuningConfig(maxNumWorkers, 
maxRowsInMemory, rowsPerSegment, indexSpec))
                .build();
 
+    MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
+
     final MSQControllerTask controllerTask = new MSQControllerTask(
         taskId,
         querySpec.withOverriddenContext(nativeQueryContext),
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
index 3a2de0da96a..ae1558babf5 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
@@ -20,8 +20,13 @@
 package org.apache.druid.msq.util;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.exec.SegmentSource;
+import org.apache.druid.msq.indexing.MSQControllerTask;
+import org.apache.druid.msq.indexing.MSQSpec;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
 import org.apache.druid.segment.column.ColumnHolder;
 
 import java.util.Collection;
@@ -90,4 +95,23 @@ public class MSQTaskQueryMakerUtils
       throw new IAE("Segment sort order must begin with column [%s]", 
ColumnHolder.TIME_COLUMN_NAME);
     }
   }
+
+  /**
+   * Validates that a query does not read from a datasource that it is 
ingesting data into, if realtime segments are
+   * being queried.
+   */
+  public static void validateRealtimeReindex(final MSQSpec querySpec)
+  {
+    final SegmentSource segmentSources = 
MultiStageQueryContext.getSegmentSources(querySpec.getQuery().context());
+    if (MSQControllerTask.isReplaceInputDataSourceTask(querySpec) && 
SegmentSource.REALTIME.equals(segmentSources)) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("Cannot ingest into datasource[%s] since it 
is also being queried from, with "
+                                 + "REALTIME segments included. Ingest to a 
different datasource, or disable querying "
+                                 + "of realtime segments by modifying [%s] in 
the query context.",
+                                 ((DataSourceMSQDestination) 
querySpec.getDestination()).getDataSource(),
+                                 
MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE
+                          );
+    }
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 227a9656a14..174e09243ae 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -29,6 +29,8 @@ import org.apache.druid.data.input.impl.DoubleDimensionSchema;
 import org.apache.druid.data.input.impl.FloatDimensionSchema;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -57,6 +59,7 @@ import 
org.apache.druid.timeline.partition.DimensionRangeShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.joda.time.Interval;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentMatcher;
@@ -1840,6 +1843,35 @@ public class MSQReplaceTest extends MSQTestBase
                      .verifyResults();
   }
 
+  @Test
+  void testRealtimeQueryWithReindexShouldThrowException()
+  {
+    Map<String, Object> context = ImmutableMap.<String, Object>builder()
+                                              .putAll(DEFAULT_MSQ_CONTEXT)
+                                              
.put(MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE, 
SegmentSource.REALTIME.name())
+                                              .build();
+
+    testIngestQuery().setSql(
+                         "REPLACE INTO foo"
+                         + " OVERWRITE ALL"
+                         + " SELECT *"
+                         + " FROM foo"
+                         + " PARTITIONED BY DAY")
+                     .setQueryContext(context)
+                     .setExpectedValidationErrorMatcher(
+                         new DruidExceptionMatcher(
+                             DruidException.Persona.USER,
+                             DruidException.Category.INVALID_INPUT,
+                             "general"
+                         ).expectMessageContains(
+                             "Cannot ingest into datasource[foo] since it is 
also being queried from, with REALTIME "
+                             + "segments included. Ingest to a different 
datasource, or disable querying of realtime "
+                             + "segments by modifying [includeSegmentSource] 
in the query context.")
+                     )
+                     .verifyPlanningErrors();
+
+  }
+
   @Nonnull
   private Set<SegmentId> expectedFooSegments()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to