This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/30.0.0 by this push:
     new cb1bc70d3f1 [Backport] Add validation for reindex with realtime 
sources (#16410)
cb1bc70d3f1 is described below

commit cb1bc70d3f1f204c2f1711692477ddc2829885ee
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu May 9 17:51:11 2024 +0530

    [Backport] Add validation for reindex with realtime sources (#16410)
    
    * Add validation for reindex with realtime sources (#16390)
    
    Add validation for reindex with realtime sources.
    
    With the addition of concurrent compaction, it is possible to ingest data 
while querying from realtime sources with MSQ into the same datasource. This 
could potentially lead to issues if the interval that is ingested into is 
replaced by an MSQ job, which has queried only some of the data from the 
realtime task.
    
    This PR adds validation to check that the datasource being ingested into is 
not being queried from, if the query includes realtime sources.
    
    * Fix conflicts
---
 docs/multi-stage-query/reference.md                |  2 +-
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  5 +++-
 .../druid/msq/indexing/MSQControllerTask.java      | 17 ++++++------
 .../apache/druid/msq/sql/MSQTaskQueryMaker.java    |  2 ++
 .../druid/msq/util/MSQTaskQueryMakerUtils.java     | 24 ++++++++++++++++
 .../org/apache/druid/msq/exec/MSQReplaceTest.java  | 32 ++++++++++++++++++++++
 6 files changed, 72 insertions(+), 10 deletions(-)

diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index 20d0ef2f7e6..aeb4305f31e 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -405,7 +405,7 @@ The following table lists the context parameters for the 
MSQ task engine:
 | `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on 
fault tolerance mode or not. Failed workers are retried based on 
[Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly 
set to false. | `false` |
 | `selectDestination` | SELECT<br /><br /> Controls where the final result of 
the select query is written. <br />Use `taskReport`(the default) to write 
select results to the task report. <b> This is not scalable since task reports 
size explodes for large results </b> <br/>Use `durableStorage` to write results 
to durable storage location. <b>For large results sets, its recommended to use 
`durableStorage` </b>. To configure durable storage see 
[`this`](#durable-storage) section. | `taskReport` |
 | `waitUntilSegmentsLoad` | INSERT, REPLACE<br /><br /> If set, the ingest 
query waits for the generated segment to be loaded before exiting, else the 
ingest query exits without waiting. The task and live reports contain the 
information about the status of loading segments if this flag is set. This will 
ensure that any future queries made after the ingestion exits will include 
results from the ingestion. The drawback is that the controller task will stall 
till the segments are loaded. |  [...]
-| `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the 
sources, which will be queried for results in addition to the segments present 
on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only 
non-realtime (published and used) segments will be downloaded from deep 
storage. If this value is `REALTIME`, results will also be included from 
realtime tasks. | `NONE` |
+| `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the 
sources, which will be queried for results in addition to the segments present 
on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only 
non-realtime (published and used) segments will be downloaded from deep 
storage. If this value is `REALTIME`, results will also be included from 
realtime tasks. `REALTIME` cannot be used while writing data into the same 
datasource it is read from.| `NONE` |
 | `rowsPerPage` | SELECT<br /><br />The number of rows per page to target. The 
actual number of rows per page may be somewhat higher or lower than this 
number. In most cases, use the default.<br /> This property comes into effect 
only when `selectDestination` is set to `durableStorage` | 100000 |
 | `skipTypeVerification` | INSERT or REPLACE<br /><br />During query 
validation, Druid validates that [string arrays](../querying/arrays.md) and 
[multi-value dimensions](../querying/multi-value-dimensions.md) are not mixed 
in the same column. If you are intentionally migrating from one to the other, 
use this context parameter to disable type validation.<br /><br />Provide the 
column list as comma-separated values or as a JSON array in string form.| empty 
list |
 | `failOnEmptyInsert` | INSERT or REPLACE<br /><br /> When set to false (the 
default), an INSERT query generating no output rows will be no-op, and a 
REPLACE query generating no output rows will delete all data that matches the 
OVERWRITE clause.  When set to true, an ingest query generating no output rows 
will throw an `InsertCannotBeEmpty` fault. | `false` |
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 682e2b484e4..83e53ee60ec 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -189,6 +189,7 @@ import org.apache.druid.msq.util.ArrayIngestMode;
 import org.apache.druid.msq.util.DimensionSchemaUtils;
 import org.apache.druid.msq.util.IntervalUtils;
 import org.apache.druid.msq.util.MSQFutureUtils;
+import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.msq.util.PassthroughAggregatorFactory;
 import org.apache.druid.msq.util.SqlStatementResourceHelper;
@@ -716,7 +717,7 @@ public class ControllerImpl implements Controller
 
     taskContextOverridesBuilder.put(
         MultiStageQueryContext.CTX_IS_REINDEX,
-        MSQControllerTask.isReplaceInputDataSourceTask(task)
+        MSQControllerTask.isReplaceInputDataSourceTask(task.getQuerySpec())
     );
 
     // propagate the controller's tags to the worker task for enhanced metrics 
reporting
@@ -1932,6 +1933,8 @@ public class ControllerImpl implements Controller
         throw new ISE("Column names are not unique: [%s]", 
columnMappings.getOutputColumnNames());
       }
 
+      MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
+
       if (columnMappings.hasOutputColumn(ColumnHolder.TIME_COLUMN_NAME)) {
         // We know there's a single time column, because we've checked 
columnMappings.hasUniqueOutputColumnNames().
         final int timeColumn = 
columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME).getInt(0);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index 7bb57c5e1c2..12bd3d9d119 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -300,17 +300,18 @@ public class MSQControllerTask extends AbstractTask 
implements ClientTaskQuery,
   }
 
   /**
-   * Returns true if the task reads from the same table as the destionation. 
In this case, we would prefer to fail
+   * Returns true if the task reads from the same table as the destination. In 
this case, we would prefer to fail
    * instead of reading any unused segments to ensure that old data is not 
read.
    */
-  public static boolean isReplaceInputDataSourceTask(MSQControllerTask task)
+  public static boolean isReplaceInputDataSourceTask(MSQSpec querySpec)
   {
-    return task.getQuerySpec()
-               .getQuery()
-               .getDataSource()
-               .getTableNames()
-               .stream()
-               .anyMatch(datasouce -> task.getDataSource().equals(datasouce));
+    if (isIngestion(querySpec)) {
+      final String targetDataSource = ((DataSourceMSQDestination) 
querySpec.getDestination()).getDataSource();
+      final Set<String> sourceTableNames = 
querySpec.getQuery().getDataSource().getTableNames();
+      return sourceTableNames.contains(targetDataSource);
+    } else {
+      return false;
+    }
   }
 
   public static boolean writeResultsToDurableStorage(final MSQSpec querySpec)
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 4d776260798..8cc34547f99 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -281,6 +281,8 @@ public class MSQTaskQueryMaker implements QueryMaker
                .tuningConfig(new MSQTuningConfig(maxNumWorkers, 
maxRowsInMemory, rowsPerSegment, indexSpec))
                .build();
 
+    MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
+
     final MSQControllerTask controllerTask = new MSQControllerTask(
         taskId,
         querySpec.withOverriddenContext(nativeQueryContext),
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
index 3a2de0da96a..ae1558babf5 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
@@ -20,8 +20,13 @@
 package org.apache.druid.msq.util;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.exec.SegmentSource;
+import org.apache.druid.msq.indexing.MSQControllerTask;
+import org.apache.druid.msq.indexing.MSQSpec;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
 import org.apache.druid.segment.column.ColumnHolder;
 
 import java.util.Collection;
@@ -90,4 +95,23 @@ public class MSQTaskQueryMakerUtils
       throw new IAE("Segment sort order must begin with column [%s]", 
ColumnHolder.TIME_COLUMN_NAME);
     }
   }
+
+  /**
+   * Validates that a query does not read from a datasource that it is 
ingesting data into, if realtime segments are
+   * being queried.
+   */
+  public static void validateRealtimeReindex(final MSQSpec querySpec)
+  {
+    final SegmentSource segmentSources = 
MultiStageQueryContext.getSegmentSources(querySpec.getQuery().context());
+    if (MSQControllerTask.isReplaceInputDataSourceTask(querySpec) && 
SegmentSource.REALTIME.equals(segmentSources)) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("Cannot ingest into datasource[%s] since it 
is also being queried from, with "
+                                 + "REALTIME segments included. Ingest to a 
different datasource, or disable querying "
+                                 + "of realtime segments by modifying [%s] in 
the query context.",
+                                 ((DataSourceMSQDestination) 
querySpec.getDestination()).getDataSource(),
+                                 
MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE
+                          );
+    }
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 9a4fb98666b..b4e8ef8745e 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -29,6 +29,8 @@ import org.apache.druid.data.input.impl.DoubleDimensionSchema;
 import org.apache.druid.data.input.impl.FloatDimensionSchema;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -56,6 +58,7 @@ import 
org.apache.druid.timeline.partition.DimensionRangeShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.joda.time.Interval;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentMatchers;
@@ -1824,6 +1827,35 @@ public class MSQReplaceTest extends MSQTestBase
                      .verifyResults();
   }
 
+  @Test
+  void testRealtimeQueryWithReindexShouldThrowException()
+  {
+    Map<String, Object> context = ImmutableMap.<String, Object>builder()
+                                              .putAll(DEFAULT_MSQ_CONTEXT)
+                                              
.put(MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE, 
SegmentSource.REALTIME.name())
+                                              .build();
+
+    testIngestQuery().setSql(
+                         "REPLACE INTO foo"
+                         + " OVERWRITE ALL"
+                         + " SELECT *"
+                         + " FROM foo"
+                         + " PARTITIONED BY DAY")
+                     .setQueryContext(context)
+                     .setExpectedValidationErrorMatcher(
+                         new DruidExceptionMatcher(
+                             DruidException.Persona.USER,
+                             DruidException.Category.INVALID_INPUT,
+                             "general"
+                         ).expectMessageContains(
+                             "Cannot ingest into datasource[foo] since it is 
also being queried from, with REALTIME "
+                             + "segments included. Ingest to a different 
datasource, or disable querying of realtime "
+                             + "segments by modifying [includeSegmentSource] 
in the query context.")
+                     )
+                     .verifyPlanningErrors();
+
+  }
+
   @Nonnull
   private Set<SegmentId> expectedFooSegments()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to