This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 953ce794396 Add undocumented taskLockType to MSQ. (#15168)
953ce794396 is described below

commit 953ce794396dfc9216e8d6f98c09a14cb9b3caff
Author: Karan Kumar <[email protected]>
AuthorDate: Tue Oct 17 21:44:04 2023 +0530

    Add undocumented taskLockType to MSQ. (#15168)
    
    Patch adds an undocumented parameter taskLockType to MSQ so that we can 
start enabling this feature for users who are interested in testing the new 
lock types.
---
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 53 ++++++++++++++++---
 .../druid/msq/indexing/MSQControllerTask.java      | 13 ++++-
 .../apache/druid/msq/sql/MSQTaskQueryMaker.java    |  5 +-
 .../druid/msq/util/MultiStageQueryContext.java     | 58 +++++++++++++++++++++
 .../org/apache/druid/msq/exec/MSQFaultsTest.java   | 59 +++++++++++++++++++---
 .../org/apache/druid/msq/exec/MSQInsertTest.java   | 16 +++++-
 .../org/apache/druid/msq/exec/MSQReplaceTest.java  | 18 ++++++-
 .../org/apache/druid/msq/exec/MSQSelectTest.java   |  5 +-
 .../druid/msq/test/MSQTestTaskActionClient.java    | 11 +++-
 .../common/task/AbstractBatchIndexTask.java        | 12 +++--
 10 files changed, 226 insertions(+), 24 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index c108c7d679e..f2260b055a9 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -47,6 +47,7 @@ import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.discovery.BrokerClient;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
 import org.apache.druid.frame.channel.FrameChannelSequence;
 import org.apache.druid.frame.key.ClusterBy;
@@ -69,7 +70,10 @@ import 
org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.LockReleaseAction;
 import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
 import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
+import org.apache.druid.indexing.common.actions.TaskAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
@@ -962,7 +966,11 @@ public class ControllerImpl implements Controller
       );
     } else {
       final RowKeyReader keyReader = clusterBy.keyReader(signature);
-      return generateSegmentIdsWithShardSpecsForAppend(destination, 
partitionBoundaries, keyReader);
+      return generateSegmentIdsWithShardSpecsForAppend(
+          destination,
+          partitionBoundaries,
+          keyReader,
+          
MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(task.getQuerySpec().getQuery().getContext()),
 false));
     }
   }
 
@@ -972,7 +980,8 @@ public class ControllerImpl implements Controller
   private List<SegmentIdWithShardSpec> 
generateSegmentIdsWithShardSpecsForAppend(
       final DataSourceMSQDestination destination,
       final ClusterByPartitions partitionBoundaries,
-      final RowKeyReader keyReader
+      final RowKeyReader keyReader,
+      final TaskLockType taskLockType
   ) throws IOException
   {
     final Granularity segmentGranularity = destination.getSegmentGranularity();
@@ -998,7 +1007,7 @@ public class ControllerImpl implements Controller
                 false,
                 NumberedPartialShardSpec.instance(),
                 LockGranularity.TIME_CHUNK,
-                TaskLockType.SHARED
+                taskLockType
             )
         );
       }
@@ -1399,6 +1408,10 @@ public class ControllerImpl implements Controller
         (DataSourceMSQDestination) task.getQuerySpec().getDestination();
     final Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);
     int numTombstones = 0;
+    final TaskLockType taskLockType = 
MultiStageQueryContext.validateAndGetTaskLockType(
+        QueryContext.of(task.getQuerySpec().getQuery().getContext()),
+        destination.isReplaceTimeChunks()
+    );
 
     if (destination.isReplaceTimeChunks()) {
       final List<Interval> intervalsToDrop = 
findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments"));
@@ -1441,7 +1454,7 @@ public class ControllerImpl implements Controller
         }
         performSegmentPublish(
             context.taskActionClient(),
-            SegmentTransactionalInsertAction.overwriteAction(null, 
segmentsWithTombstones)
+            createOverwriteAction(taskLockType, segmentsWithTombstones)
         );
       }
     } else if (!segments.isEmpty()) {
@@ -1458,7 +1471,7 @@ public class ControllerImpl implements Controller
       // Append mode.
       performSegmentPublish(
           context.taskActionClient(),
-          SegmentTransactionalInsertAction.appendAction(segments, null, null)
+          createAppendAction(segments, taskLockType)
       );
     }
 
@@ -1467,6 +1480,34 @@ public class ControllerImpl implements Controller
     task.emitMetric(context.emitter(), "ingest/segments/count", 
segmentsWithTombstones.size());
   }
 
+  private static TaskAction<SegmentPublishResult> createAppendAction(
+      Set<DataSegment> segments,
+      TaskLockType taskLockType
+  )
+  {
+    if (taskLockType.equals(TaskLockType.APPEND)) {
+      return SegmentTransactionalAppendAction.forSegments(segments);
+    } else if (taskLockType.equals(TaskLockType.SHARED)) {
+      return SegmentTransactionalInsertAction.appendAction(segments, null, 
null);
+    } else {
+      throw DruidException.defensive("Invalid lock type [%s] received for 
append action", taskLockType);
+    }
+  }
+
+  private TaskAction<SegmentPublishResult> createOverwriteAction(
+      TaskLockType taskLockType,
+      Set<DataSegment> segmentsWithTombstones
+  )
+  {
+    if (taskLockType.equals(TaskLockType.REPLACE)) {
+      return SegmentTransactionalReplaceAction.create(segmentsWithTombstones);
+    } else if (taskLockType.equals(TaskLockType.EXCLUSIVE)) {
+      return SegmentTransactionalInsertAction.overwriteAction(null, 
segmentsWithTombstones);
+    } else {
+      throw DruidException.defensive("Invalid lock type [%s] received for 
overwrite action", taskLockType);
+    }
+  }
+
   /**
    * When doing an ingestion with {@link 
DataSourceMSQDestination#isReplaceTimeChunks()}, finds intervals
    * containing data that should be dropped.
@@ -2282,7 +2323,7 @@ public class ControllerImpl implements Controller
    */
   static void performSegmentPublish(
       final TaskActionClient client,
-      final SegmentTransactionalInsertAction action
+      final TaskAction<SegmentPublishResult> action
   ) throws IOException
   {
     try {
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index 43967e7d748..3cdf706ba16 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -51,6 +51,8 @@ import org.apache.druid.msq.exec.MSQTasks;
 import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
 import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
 import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.QueryContext;
 import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.StandardRetryPolicy;
 import org.apache.druid.rpc.indexing.OverlordClient;
@@ -204,12 +206,19 @@ public class MSQControllerTask extends AbstractTask 
implements ClientTaskQuery
   {
     // If we're in replace mode, acquire locks for all intervals before 
declaring the task ready.
     if (isIngestion(querySpec) && ((DataSourceMSQDestination) 
querySpec.getDestination()).isReplaceTimeChunks()) {
+      final TaskLockType taskLockType =
+          
MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(querySpec.getQuery().getContext()),
 true);
       final List<Interval> intervals =
           ((DataSourceMSQDestination) 
querySpec.getDestination()).getReplaceTimeChunks();
-      log.debug("Task[%s] trying to acquire[%s] locks for intervals[%s] to 
become ready", getId(), TaskLockType.EXCLUSIVE, intervals);
+      log.debug(
+          "Task[%s] trying to acquire[%s] locks for intervals[%s] to become 
ready",
+          getId(),
+          taskLockType,
+          intervals
+      );
       for (final Interval interval : intervals) {
         final TaskLock taskLock =
-            taskActionClient.submit(new 
TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval));
+            taskActionClient.submit(new 
TimeChunkLockTryAcquireAction(taskLockType, interval));
 
         if (taskLock == null) {
           return false;
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index de48387db20..d38fa1a8dc6 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -226,12 +226,15 @@ public class MSQTaskQueryMaker implements QueryMaker
           fieldMapping.stream().map(f -> f.right).collect(Collectors.toList())
       );
 
-      destination = new DataSourceMSQDestination(
+      final DataSourceMSQDestination dataSourceMSQDestination = new 
DataSourceMSQDestination(
           targetDataSource,
           segmentGranularityObject,
           segmentSortOrder,
           replaceTimeChunks
       );
+      MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext,
+                                                        
dataSourceMSQDestination.isReplaceTimeChunks());
+      destination = dataSourceMSQDestination;
     } else {
       final MSQSelectDestination msqSelectDestination = 
MultiStageQueryContext.getSelectDestination(sqlQueryContext);
       if (msqSelectDestination.equals(MSQSelectDestination.TASKREPORT)) {
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 77b11a28768..f6caa6da059 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -25,6 +25,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.opencsv.RFC4180Parser;
 import com.opencsv.RFC4180ParserBuilder;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
 import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.exec.SegmentSource;
@@ -78,6 +82,11 @@ import java.util.stream.Collectors;
  * ingested via MSQ. If set to 'none', arrays are not allowed to be ingested 
in MSQ. If set to 'array', array types
  * can be ingested as expected. If set to 'mvd', numeric arrays can not be 
ingested, and string arrays will be
  * ingested as MVDs (this is kept for legacy purpose).
+ *
+ * <li><b>taskLockType</b>: Temporary flag to allow MSQ to use experimental 
lock types. Valid values are present in
+ * {@link TaskLockType}. If the flag is not set, msq uses {@link 
TaskLockType#EXCLUSIVE} for replace queries and
+ * {@link TaskLockType#SHARED} for insert queries.
+ *
  * </ol>
  **/
 public class MultiStageQueryContext
@@ -350,4 +359,53 @@ public class MultiStageQueryContext
       throw QueryContexts.badValueException(CTX_INDEX_SPEC, "an indexSpec", 
indexSpecObject);
     }
   }
+
+  /**
+   * This method is used to validate and get the taskLockType from the 
queryContext.
+   * If the queryContext does not contain the taskLockType, then {@link 
TaskLockType#EXCLUSIVE} is used for replace queries and
+   * {@link TaskLockType#SHARED} is used for insert queries.
+   * If the queryContext contains the taskLockType, then it is validated and 
returned.
+   */
+  public static TaskLockType validateAndGetTaskLockType(QueryContext 
queryContext, boolean isReplaceQuery)
+  {
+    final TaskLockType taskLockType = QueryContexts.getAsEnum(
+        Tasks.TASK_LOCK_TYPE,
+        queryContext.getString(Tasks.TASK_LOCK_TYPE, null),
+        TaskLockType.class
+    );
+    if (taskLockType == null) {
+      if (isReplaceQuery) {
+        return TaskLockType.EXCLUSIVE;
+      } else {
+        return TaskLockType.SHARED;
+      }
+    }
+    final String appendErrorMessage = StringUtils.format(
+        " Please use [%s] key in the context parameter and use one of the 
TaskLock types as mentioned earlier or "
+        + "remove this key for automatic lock type selection", 
Tasks.TASK_LOCK_TYPE);
+
+    if (isReplaceQuery && !(taskLockType.equals(TaskLockType.EXCLUSIVE) || 
taskLockType.equals(TaskLockType.REPLACE))) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build(
+                              "TaskLock must be of type [%s] or [%s] for a 
REPLACE query. Found invalid type [%s] set."
+                              + appendErrorMessage,
+                              TaskLockType.EXCLUSIVE,
+                              TaskLockType.REPLACE,
+                              taskLockType
+                          );
+    }
+    if (!isReplaceQuery && !(taskLockType.equals(TaskLockType.SHARED) || 
taskLockType.equals(TaskLockType.APPEND))) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build(
+                              "TaskLock must be of type [%s] or [%s] for an 
INSERT query. Found invalid type [%s] set."
+                              + appendErrorMessage,
+                              TaskLockType.SHARED,
+                              TaskLockType.APPEND,
+                              taskLockType
+                          );
+    }
+    return taskLockType;
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
index 4b77dd78b33..612dee3bbd8 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
@@ -22,7 +22,9 @@ package org.apache.druid.msq.exec;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
@@ -45,6 +47,7 @@ import org.mockito.Mockito;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -175,10 +178,10 @@ public class MSQFaultsTest extends MSQTestBase
                     .build();
 
     final String sql = "INSERT INTO foo1\n"
-                     + "SELECT TIME_PARSE(dim1) AS __time, dim1 as cnt\n"
-                     + "FROM foo\n"
-                     + "PARTITIONED BY DAY\n"
-                     + "CLUSTERED BY dim1";
+                       + "SELECT TIME_PARSE(dim1) AS __time, dim1 as cnt\n"
+                       + "FROM foo\n"
+                       + "PARTITIONED BY DAY\n"
+                       + "CLUSTERED BY dim1";
 
     testIngestQuery()
         .setSql(sql)
@@ -349,8 +352,9 @@ public class MSQFaultsTest extends MSQTestBase
                 DruidException.Persona.ADMIN,
                 DruidException.Category.INVALID_INPUT,
                 "general"
-            ).expectMessageContains("SQL requires union between two tables and 
column names queried for each table are different "
-                              + "Left: [dim2, dim1, m1], Right: [dim1, dim2, 
m1]."))
+            ).expectMessageContains(
+                "SQL requires union between two tables and column names 
queried for each table are different "
+                + "Left: [dim2, dim1, m1], Right: [dim1, dim2, m1]."))
         .verifyPlanningErrors();
   }
 
@@ -374,4 +378,47 @@ public class MSQFaultsTest extends MSQTestBase
                 "SQL requires union between inputs that are not simple table 
scans and involve a filter or aliasing"))
         .verifyPlanningErrors();
   }
+
+  @Test
+  public void testInsertWithReplaceAndExcludeLocks()
+  {
+    for (TaskLockType taskLockType : new 
TaskLockType[]{TaskLockType.EXCLUSIVE, TaskLockType.REPLACE}) {
+      testLockTypes(
+          taskLockType,
+          "INSERT INTO foo1 select * from foo partitioned by day",
+          "TaskLock must be of type [SHARED] or [APPEND] for an INSERT query"
+      );
+    }
+  }
+
+  @Test
+  public void testReplaceWithAppendAndSharedLocks()
+  {
+    for (TaskLockType taskLockType : new TaskLockType[]{TaskLockType.APPEND, 
TaskLockType.SHARED}) {
+      testLockTypes(
+          taskLockType,
+          "REPLACE INTO foo1 overwrite ALL select * from foo partitioned by 
day",
+          "TaskLock must be of type [EXCLUSIVE] or [REPLACE] for a REPLACE 
query"
+      );
+    }
+  }
+
+  private void testLockTypes(TaskLockType contextTaskLockType, String sql, 
String errorMessage)
+  {
+    Map<String, Object> context = new HashMap<>(DEFAULT_MSQ_CONTEXT);
+    context.put(Tasks.TASK_LOCK_TYPE, contextTaskLockType.name());
+    testIngestQuery()
+        .setSql(
+            sql
+        )
+        .setQueryContext(context)
+        .setExpectedValidationErrorMatcher(
+            new DruidExceptionMatcher(
+                DruidException.Persona.USER,
+                DruidException.Category.INVALID_INPUT,
+                "general"
+            ).expectMessageContains(
+                errorMessage))
+        .verifyPlanningErrors();
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index b43dd72e88c..2314c10d7e6 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -28,6 +28,8 @@ import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -59,6 +61,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
@@ -67,6 +70,16 @@ import java.util.TreeSet;
 @RunWith(Parameterized.class)
 public class MSQInsertTest extends MSQTestBase
 {
+
+  private static final String WITH_APPEND_LOCK = "WITH_APPEND_LOCK";
+  private static final Map<String, Object> QUERY_CONTEXT_WITH_APPEND_LOCK =
+      ImmutableMap.<String, Object>builder()
+                  .putAll(DEFAULT_MSQ_CONTEXT)
+                  .put(
+                      Tasks.TASK_LOCK_TYPE,
+                      TaskLockType.APPEND.name().toLowerCase(Locale.ENGLISH)
+                  )
+                  .build();
   private final HashFunction fn = Hashing.murmur3_128();
 
   @Parameterized.Parameters(name = "{index}:with context {0}")
@@ -76,7 +89,8 @@ public class MSQInsertTest extends MSQTestBase
         {DEFAULT, DEFAULT_MSQ_CONTEXT},
         {DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT},
         {FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
-        {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}
+        {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT},
+        {WITH_APPEND_LOCK, QUERY_CONTEXT_WITH_APPEND_LOCK}
     };
     return Arrays.asList(data);
   }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 0a43fdaea72..144e74b084e 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -20,10 +20,14 @@
 package org.apache.druid.msq.exec;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
+import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.msq.test.CounterSnapshotMatcher;
 import org.apache.druid.msq.test.MSQTestBase;
 import org.apache.druid.msq.test.MSQTestFileUtils;
@@ -54,6 +58,17 @@ import java.util.TreeSet;
 @RunWith(Parameterized.class)
 public class MSQReplaceTest extends MSQTestBase
 {
+
+  private static final String WITH_REPLACE_LOCK = "WITH_REPLACE_LOCK";
+  private static final Map<String, Object> QUERY_CONTEXT_WITH_REPLACE_LOCK =
+      ImmutableMap.<String, Object>builder()
+                  .putAll(DEFAULT_MSQ_CONTEXT)
+                  .put(
+                      Tasks.TASK_LOCK_TYPE,
+                      StringUtils.toLowerCase(TaskLockType.REPLACE.name())
+                  )
+                  .build();
+
   @Parameterized.Parameters(name = "{index}:with context {0}")
   public static Collection<Object[]> data()
   {
@@ -61,7 +76,8 @@ public class MSQReplaceTest extends MSQTestBase
         {DEFAULT, DEFAULT_MSQ_CONTEXT},
         {DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT},
         {FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
-        {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}
+        {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT},
+        {WITH_REPLACE_LOCK, QUERY_CONTEXT_WITH_REPLACE_LOCK}
     };
     return Arrays.asList(data);
   }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index 219af6a3188..b63ee479e20 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -97,7 +97,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 
 @RunWith(Parameterized.class)
@@ -114,7 +113,7 @@ public class MSQSelectTest extends MSQTestBase
                   .put(MultiStageQueryContext.CTX_ROWS_PER_PAGE, 2)
                   .put(
                       MultiStageQueryContext.CTX_SELECT_DESTINATION,
-                      
MSQSelectDestination.DURABLESTORAGE.getName().toLowerCase(Locale.ENGLISH)
+                      
StringUtils.toLowerCase(MSQSelectDestination.DURABLESTORAGE.getName())
                   )
                   .build();
 
@@ -124,7 +123,7 @@ public class MSQSelectTest extends MSQTestBase
                   .putAll(DEFAULT_MSQ_CONTEXT)
                   .put(
                       MultiStageQueryContext.CTX_SELECT_DESTINATION,
-                      
MSQSelectDestination.DURABLESTORAGE.getName().toLowerCase(Locale.ENGLISH)
+                      
StringUtils.toLowerCase(MSQSelectDestination.DURABLESTORAGE.getName())
                   )
                   .build();
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
index 897e57c93bc..31b3272b74f 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
@@ -28,7 +28,9 @@ import org.apache.druid.indexing.common.TimeChunkLock;
 import org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
 import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
 import org.apache.druid.indexing.common.actions.TaskAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
@@ -121,10 +123,17 @@ public class MSQTestTaskActionClient implements 
TaskActionClient
                                      ).collect(Collectors.toSet());
       }
     } else if (taskAction instanceof SegmentTransactionalInsertAction) {
-      // Always OK.
       final Set<DataSegment> segments = ((SegmentTransactionalInsertAction) 
taskAction).getSegments();
       publishedSegments.addAll(segments);
       return (RetType) SegmentPublishResult.ok(segments);
+    } else if (taskAction instanceof SegmentTransactionalReplaceAction) {
+      final Set<DataSegment> segments = ((SegmentTransactionalReplaceAction) 
taskAction).getSegments();
+      publishedSegments.addAll(segments);
+      return (RetType) SegmentPublishResult.ok(segments);
+    } else if (taskAction instanceof SegmentTransactionalAppendAction) {
+      final Set<DataSegment> segments = ((SegmentTransactionalAppendAction) 
taskAction).getSegments();
+      publishedSegments.addAll(segments);
+      return (RetType) SegmentPublishResult.ok(segments);
     } else {
       return null;
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index fe19b35391e..cbc469b6133 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -62,6 +62,7 @@ import 
org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
@@ -483,13 +484,18 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
       return TaskLockType.EXCLUSIVE;
     }
 
-    final String contextLockType = getContextValue(Tasks.TASK_LOCK_TYPE);
+    final TaskLockType contextTaskLockType = QueryContexts.getAsEnum(
+        Tasks.TASK_LOCK_TYPE,
+        getContextValue(Tasks.TASK_LOCK_TYPE),
+        TaskLockType.class
+    );
+
     final TaskLockType lockType;
-    if (contextLockType == null) {
+    if (contextTaskLockType == null) {
       lockType = getContextValue(Tasks.USE_SHARED_LOCK, false)
                  ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
     } else {
-      lockType = TaskLockType.valueOf(contextLockType);
+      lockType = contextTaskLockType;
     }
 
     final IngestionMode ingestionMode = getIngestionMode();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to