This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 953ce794396 Add undocumented taskLockType to MSQ. (#15168)
953ce794396 is described below
commit 953ce794396dfc9216e8d6f98c09a14cb9b3caff
Author: Karan Kumar <[email protected]>
AuthorDate: Tue Oct 17 21:44:04 2023 +0530
Add undocumented taskLockType to MSQ. (#15168)
Patch adds an undocumented parameter taskLockType to MSQ so that we can
start enabling this feature for users who are interested in testing the new
lock types.
---
.../org/apache/druid/msq/exec/ControllerImpl.java | 53 ++++++++++++++++---
.../druid/msq/indexing/MSQControllerTask.java | 13 ++++-
.../apache/druid/msq/sql/MSQTaskQueryMaker.java | 5 +-
.../druid/msq/util/MultiStageQueryContext.java | 58 +++++++++++++++++++++
.../org/apache/druid/msq/exec/MSQFaultsTest.java | 59 +++++++++++++++++++---
.../org/apache/druid/msq/exec/MSQInsertTest.java | 16 +++++-
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 18 ++++++-
.../org/apache/druid/msq/exec/MSQSelectTest.java | 5 +-
.../druid/msq/test/MSQTestTaskActionClient.java | 11 +++-
.../common/task/AbstractBatchIndexTask.java | 12 +++--
10 files changed, 226 insertions(+), 24 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index c108c7d679e..f2260b055a9 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -47,6 +47,7 @@ import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.BrokerClient;
+import org.apache.druid.error.DruidException;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.channel.FrameChannelSequence;
import org.apache.druid.frame.key.ClusterBy;
@@ -69,7 +70,10 @@ import
org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
import
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
+import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
@@ -962,7 +966,11 @@ public class ControllerImpl implements Controller
);
} else {
final RowKeyReader keyReader = clusterBy.keyReader(signature);
- return generateSegmentIdsWithShardSpecsForAppend(destination,
partitionBoundaries, keyReader);
+ return generateSegmentIdsWithShardSpecsForAppend(
+ destination,
+ partitionBoundaries,
+ keyReader,
+
MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(task.getQuerySpec().getQuery().getContext()),
false));
}
}
@@ -972,7 +980,8 @@ public class ControllerImpl implements Controller
private List<SegmentIdWithShardSpec>
generateSegmentIdsWithShardSpecsForAppend(
final DataSourceMSQDestination destination,
final ClusterByPartitions partitionBoundaries,
- final RowKeyReader keyReader
+ final RowKeyReader keyReader,
+ final TaskLockType taskLockType
) throws IOException
{
final Granularity segmentGranularity = destination.getSegmentGranularity();
@@ -998,7 +1007,7 @@ public class ControllerImpl implements Controller
false,
NumberedPartialShardSpec.instance(),
LockGranularity.TIME_CHUNK,
- TaskLockType.SHARED
+ taskLockType
)
);
}
@@ -1399,6 +1408,10 @@ public class ControllerImpl implements Controller
(DataSourceMSQDestination) task.getQuerySpec().getDestination();
final Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);
int numTombstones = 0;
+ final TaskLockType taskLockType =
MultiStageQueryContext.validateAndGetTaskLockType(
+ QueryContext.of(task.getQuerySpec().getQuery().getContext()),
+ destination.isReplaceTimeChunks()
+ );
if (destination.isReplaceTimeChunks()) {
final List<Interval> intervalsToDrop =
findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments"));
@@ -1441,7 +1454,7 @@ public class ControllerImpl implements Controller
}
performSegmentPublish(
context.taskActionClient(),
- SegmentTransactionalInsertAction.overwriteAction(null,
segmentsWithTombstones)
+ createOverwriteAction(taskLockType, segmentsWithTombstones)
);
}
} else if (!segments.isEmpty()) {
@@ -1458,7 +1471,7 @@ public class ControllerImpl implements Controller
// Append mode.
performSegmentPublish(
context.taskActionClient(),
- SegmentTransactionalInsertAction.appendAction(segments, null, null)
+ createAppendAction(segments, taskLockType)
);
}
@@ -1467,6 +1480,34 @@ public class ControllerImpl implements Controller
task.emitMetric(context.emitter(), "ingest/segments/count",
segmentsWithTombstones.size());
}
+ private static TaskAction<SegmentPublishResult> createAppendAction(
+ Set<DataSegment> segments,
+ TaskLockType taskLockType
+ )
+ {
+ if (taskLockType.equals(TaskLockType.APPEND)) {
+ return SegmentTransactionalAppendAction.forSegments(segments);
+ } else if (taskLockType.equals(TaskLockType.SHARED)) {
+ return SegmentTransactionalInsertAction.appendAction(segments, null,
null);
+ } else {
+ throw DruidException.defensive("Invalid lock type [%s] received for
append action", taskLockType);
+ }
+ }
+
+ private TaskAction<SegmentPublishResult> createOverwriteAction(
+ TaskLockType taskLockType,
+ Set<DataSegment> segmentsWithTombstones
+ )
+ {
+ if (taskLockType.equals(TaskLockType.REPLACE)) {
+ return SegmentTransactionalReplaceAction.create(segmentsWithTombstones);
+ } else if (taskLockType.equals(TaskLockType.EXCLUSIVE)) {
+ return SegmentTransactionalInsertAction.overwriteAction(null,
segmentsWithTombstones);
+ } else {
+ throw DruidException.defensive("Invalid lock type [%s] received for
overwrite action", taskLockType);
+ }
+ }
+
/**
* When doing an ingestion with {@link
DataSourceMSQDestination#isReplaceTimeChunks()}, finds intervals
* containing data that should be dropped.
@@ -2282,7 +2323,7 @@ public class ControllerImpl implements Controller
*/
static void performSegmentPublish(
final TaskActionClient client,
- final SegmentTransactionalInsertAction action
+ final TaskAction<SegmentPublishResult> action
) throws IOException
{
try {
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index 43967e7d748..3cdf706ba16 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -51,6 +51,8 @@ import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.QueryContext;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
@@ -204,12 +206,19 @@ public class MSQControllerTask extends AbstractTask
implements ClientTaskQuery
{
// If we're in replace mode, acquire locks for all intervals before
declaring the task ready.
if (isIngestion(querySpec) && ((DataSourceMSQDestination)
querySpec.getDestination()).isReplaceTimeChunks()) {
+ final TaskLockType taskLockType =
+
MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(querySpec.getQuery().getContext()),
true);
final List<Interval> intervals =
((DataSourceMSQDestination)
querySpec.getDestination()).getReplaceTimeChunks();
- log.debug("Task[%s] trying to acquire[%s] locks for intervals[%s] to
become ready", getId(), TaskLockType.EXCLUSIVE, intervals);
+ log.debug(
+ "Task[%s] trying to acquire[%s] locks for intervals[%s] to become
ready",
+ getId(),
+ taskLockType,
+ intervals
+ );
for (final Interval interval : intervals) {
final TaskLock taskLock =
- taskActionClient.submit(new
TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval));
+ taskActionClient.submit(new
TimeChunkLockTryAcquireAction(taskLockType, interval));
if (taskLock == null) {
return false;
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index de48387db20..d38fa1a8dc6 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -226,12 +226,15 @@ public class MSQTaskQueryMaker implements QueryMaker
fieldMapping.stream().map(f -> f.right).collect(Collectors.toList())
);
- destination = new DataSourceMSQDestination(
+ final DataSourceMSQDestination dataSourceMSQDestination = new
DataSourceMSQDestination(
targetDataSource,
segmentGranularityObject,
segmentSortOrder,
replaceTimeChunks
);
+ MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext,
+
dataSourceMSQDestination.isReplaceTimeChunks());
+ destination = dataSourceMSQDestination;
} else {
final MSQSelectDestination msqSelectDestination =
MultiStageQueryContext.getSelectDestination(sqlQueryContext);
if (msqSelectDestination.equals(MSQSelectDestination.TASKREPORT)) {
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 77b11a28768..f6caa6da059 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -25,6 +25,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.opencsv.RFC4180Parser;
import com.opencsv.RFC4180ParserBuilder;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.exec.SegmentSource;
@@ -78,6 +82,11 @@ import java.util.stream.Collectors;
* ingested via MSQ. If set to 'none', arrays are not allowed to be ingested
in MSQ. If set to 'array', array types
* can be ingested as expected. If set to 'mvd', numeric arrays can not be
ingested, and string arrays will be
* ingested as MVDs (this is kept for legacy purpose).
+ *
+ * <li><b>taskLockType</b>: Temporary flag to allow MSQ to use experimental
lock types. Valid values are present in
+ * {@link TaskLockType}. If the flag is not set, msq uses {@link
TaskLockType#EXCLUSIVE} for replace queries and
+ * {@link TaskLockType#SHARED} for insert queries.
+ *
* </ol>
**/
public class MultiStageQueryContext
@@ -350,4 +359,53 @@ public class MultiStageQueryContext
throw QueryContexts.badValueException(CTX_INDEX_SPEC, "an indexSpec",
indexSpecObject);
}
}
+
+ /**
+ * This method is used to validate and get the taskLockType from the
queryContext.
+ * If the queryContext does not contain the taskLockType, then {@link
TaskLockType#EXCLUSIVE} is used for replace queries and
+ * {@link TaskLockType#SHARED} is used for insert queries.
+ * If the queryContext contains the taskLockType, then it is validated and
returned.
+ */
+ public static TaskLockType validateAndGetTaskLockType(QueryContext
queryContext, boolean isReplaceQuery)
+ {
+ final TaskLockType taskLockType = QueryContexts.getAsEnum(
+ Tasks.TASK_LOCK_TYPE,
+ queryContext.getString(Tasks.TASK_LOCK_TYPE, null),
+ TaskLockType.class
+ );
+ if (taskLockType == null) {
+ if (isReplaceQuery) {
+ return TaskLockType.EXCLUSIVE;
+ } else {
+ return TaskLockType.SHARED;
+ }
+ }
+ final String appendErrorMessage = StringUtils.format(
+ " Please use [%s] key in the context parameter and use one of the
TaskLock types as mentioned earlier or "
+ + "remove this key for automatic lock type selection",
Tasks.TASK_LOCK_TYPE);
+
+ if (isReplaceQuery && !(taskLockType.equals(TaskLockType.EXCLUSIVE) ||
taskLockType.equals(TaskLockType.REPLACE))) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ "TaskLock must be of type [%s] or [%s] for a
REPLACE query. Found invalid type [%s] set."
+ + appendErrorMessage,
+ TaskLockType.EXCLUSIVE,
+ TaskLockType.REPLACE,
+ taskLockType
+ );
+ }
+ if (!isReplaceQuery && !(taskLockType.equals(TaskLockType.SHARED) ||
taskLockType.equals(TaskLockType.APPEND))) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ "TaskLock must be of type [%s] or [%s] for an
INSERT query. Found invalid type [%s] set."
+ + appendErrorMessage,
+ TaskLockType.SHARED,
+ TaskLockType.APPEND,
+ taskLockType
+ );
+ }
+ return taskLockType;
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
index 4b77dd78b33..612dee3bbd8 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
@@ -22,7 +22,9 @@ package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
@@ -45,6 +47,7 @@ import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -175,10 +178,10 @@ public class MSQFaultsTest extends MSQTestBase
.build();
final String sql = "INSERT INTO foo1\n"
- + "SELECT TIME_PARSE(dim1) AS __time, dim1 as cnt\n"
- + "FROM foo\n"
- + "PARTITIONED BY DAY\n"
- + "CLUSTERED BY dim1";
+ + "SELECT TIME_PARSE(dim1) AS __time, dim1 as cnt\n"
+ + "FROM foo\n"
+ + "PARTITIONED BY DAY\n"
+ + "CLUSTERED BY dim1";
testIngestQuery()
.setSql(sql)
@@ -349,8 +352,9 @@ public class MSQFaultsTest extends MSQTestBase
DruidException.Persona.ADMIN,
DruidException.Category.INVALID_INPUT,
"general"
- ).expectMessageContains("SQL requires union between two tables and
column names queried for each table are different "
- + "Left: [dim2, dim1, m1], Right: [dim1, dim2,
m1]."))
+ ).expectMessageContains(
+ "SQL requires union between two tables and column names
queried for each table are different "
+ + "Left: [dim2, dim1, m1], Right: [dim1, dim2, m1]."))
.verifyPlanningErrors();
}
@@ -374,4 +378,47 @@ public class MSQFaultsTest extends MSQTestBase
"SQL requires union between inputs that are not simple table
scans and involve a filter or aliasing"))
.verifyPlanningErrors();
}
+
+ @Test
+ public void testInsertWithReplaceAndExcludeLocks()
+ {
+ for (TaskLockType taskLockType : new
TaskLockType[]{TaskLockType.EXCLUSIVE, TaskLockType.REPLACE}) {
+ testLockTypes(
+ taskLockType,
+ "INSERT INTO foo1 select * from foo partitioned by day",
+ "TaskLock must be of type [SHARED] or [APPEND] for an INSERT query"
+ );
+ }
+ }
+
+ @Test
+ public void testReplaceWithAppendAndSharedLocks()
+ {
+ for (TaskLockType taskLockType : new TaskLockType[]{TaskLockType.APPEND,
TaskLockType.SHARED}) {
+ testLockTypes(
+ taskLockType,
+ "REPLACE INTO foo1 overwrite ALL select * from foo partitioned by
day",
+ "TaskLock must be of type [EXCLUSIVE] or [REPLACE] for a REPLACE
query"
+ );
+ }
+ }
+
+ private void testLockTypes(TaskLockType contextTaskLockType, String sql,
String errorMessage)
+ {
+ Map<String, Object> context = new HashMap<>(DEFAULT_MSQ_CONTEXT);
+ context.put(Tasks.TASK_LOCK_TYPE, contextTaskLockType.name());
+ testIngestQuery()
+ .setSql(
+ sql
+ )
+ .setQueryContext(context)
+ .setExpectedValidationErrorMatcher(
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "general"
+ ).expectMessageContains(
+ errorMessage))
+ .verifyPlanningErrors();
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index b43dd72e88c..2314c10d7e6 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -28,6 +28,8 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -59,6 +61,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@@ -67,6 +70,16 @@ import java.util.TreeSet;
@RunWith(Parameterized.class)
public class MSQInsertTest extends MSQTestBase
{
+
+ private static final String WITH_APPEND_LOCK = "WITH_APPEND_LOCK";
+ private static final Map<String, Object> QUERY_CONTEXT_WITH_APPEND_LOCK =
+ ImmutableMap.<String, Object>builder()
+ .putAll(DEFAULT_MSQ_CONTEXT)
+ .put(
+ Tasks.TASK_LOCK_TYPE,
+ TaskLockType.APPEND.name().toLowerCase(Locale.ENGLISH)
+ )
+ .build();
private final HashFunction fn = Hashing.murmur3_128();
@Parameterized.Parameters(name = "{index}:with context {0}")
@@ -76,7 +89,8 @@ public class MSQInsertTest extends MSQTestBase
{DEFAULT, DEFAULT_MSQ_CONTEXT},
{DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT},
{FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
- {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}
+ {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT},
+ {WITH_APPEND_LOCK, QUERY_CONTEXT_WITH_APPEND_LOCK}
};
return Arrays.asList(data);
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 0a43fdaea72..144e74b084e 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -20,10 +20,14 @@
package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
+import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
@@ -54,6 +58,17 @@ import java.util.TreeSet;
@RunWith(Parameterized.class)
public class MSQReplaceTest extends MSQTestBase
{
+
+ private static final String WITH_REPLACE_LOCK = "WITH_REPLACE_LOCK";
+ private static final Map<String, Object> QUERY_CONTEXT_WITH_REPLACE_LOCK =
+ ImmutableMap.<String, Object>builder()
+ .putAll(DEFAULT_MSQ_CONTEXT)
+ .put(
+ Tasks.TASK_LOCK_TYPE,
+ StringUtils.toLowerCase(TaskLockType.REPLACE.name())
+ )
+ .build();
+
@Parameterized.Parameters(name = "{index}:with context {0}")
public static Collection<Object[]> data()
{
@@ -61,7 +76,8 @@ public class MSQReplaceTest extends MSQTestBase
{DEFAULT, DEFAULT_MSQ_CONTEXT},
{DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT},
{FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
- {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}
+ {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT},
+ {WITH_REPLACE_LOCK, QUERY_CONTEXT_WITH_REPLACE_LOCK}
};
return Arrays.asList(data);
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index 219af6a3188..b63ee479e20 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -97,7 +97,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
@RunWith(Parameterized.class)
@@ -114,7 +113,7 @@ public class MSQSelectTest extends MSQTestBase
.put(MultiStageQueryContext.CTX_ROWS_PER_PAGE, 2)
.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
-
MSQSelectDestination.DURABLESTORAGE.getName().toLowerCase(Locale.ENGLISH)
+
StringUtils.toLowerCase(MSQSelectDestination.DURABLESTORAGE.getName())
)
.build();
@@ -124,7 +123,7 @@ public class MSQSelectTest extends MSQTestBase
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
-
MSQSelectDestination.DURABLESTORAGE.getName().toLowerCase(Locale.ENGLISH)
+
StringUtils.toLowerCase(MSQSelectDestination.DURABLESTORAGE.getName())
)
.build();
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
index 897e57c93bc..31b3272b74f 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
@@ -28,7 +28,9 @@ import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
import
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
@@ -121,10 +123,17 @@ public class MSQTestTaskActionClient implements
TaskActionClient
).collect(Collectors.toSet());
}
} else if (taskAction instanceof SegmentTransactionalInsertAction) {
- // Always OK.
final Set<DataSegment> segments = ((SegmentTransactionalInsertAction)
taskAction).getSegments();
publishedSegments.addAll(segments);
return (RetType) SegmentPublishResult.ok(segments);
+ } else if (taskAction instanceof SegmentTransactionalReplaceAction) {
+ final Set<DataSegment> segments = ((SegmentTransactionalReplaceAction)
taskAction).getSegments();
+ publishedSegments.addAll(segments);
+ return (RetType) SegmentPublishResult.ok(segments);
+ } else if (taskAction instanceof SegmentTransactionalAppendAction) {
+ final Set<DataSegment> segments = ((SegmentTransactionalAppendAction)
taskAction).getSegments();
+ publishedSegments.addAll(segments);
+ return (RetType) SegmentPublishResult.ok(segments);
} else {
return null;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index fe19b35391e..cbc469b6133 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -62,6 +62,7 @@ import
org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
@@ -483,13 +484,18 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
return TaskLockType.EXCLUSIVE;
}
- final String contextLockType = getContextValue(Tasks.TASK_LOCK_TYPE);
+ final TaskLockType contextTaskLockType = QueryContexts.getAsEnum(
+ Tasks.TASK_LOCK_TYPE,
+ getContextValue(Tasks.TASK_LOCK_TYPE),
+ TaskLockType.class
+ );
+
final TaskLockType lockType;
- if (contextLockType == null) {
+ if (contextTaskLockType == null) {
lockType = getContextValue(Tasks.USE_SHARED_LOCK, false)
? TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
} else {
- lockType = TaskLockType.valueOf(contextLockType);
+ lockType = contextTaskLockType;
}
final IngestionMode ingestionMode = getIngestionMode();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]