LakshSingla commented on code in PR #15168:
URL: https://github.com/apache/druid/pull/15168#discussion_r1360883310
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1467,6 +1480,34 @@ private void publishAllSegments(final Set<DataSegment>
segments) throws IOExcept
task.emitMetric(context.emitter(), "ingest/segments/count",
segmentsWithTombstones.size());
}
+ private static TaskAction<SegmentPublishResult> createAppendAction(
+ Set<DataSegment> segments,
+ TaskLockType taskLockType
+ )
+ {
+ if (taskLockType.equals(TaskLockType.APPEND)) {
+ return SegmentTransactionalAppendAction.forSegments(segments);
+ } else if (taskLockType.equals(TaskLockType.SHARED)) {
+ return SegmentTransactionalInsertAction.appendAction(segments, null,
null);
+ } else {
+ throw DruidException.defensive("Invalid lock type %s received for append
action", taskLockType);
Review Comment:
nit: extrapolation
```suggestion
throw DruidException.defensive("Invalid lock type [%s] received for
append action", taskLockType);
```
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1467,6 +1480,34 @@ private void publishAllSegments(final Set<DataSegment>
segments) throws IOExcept
task.emitMetric(context.emitter(), "ingest/segments/count",
segmentsWithTombstones.size());
}
+ private static TaskAction<SegmentPublishResult> createAppendAction(
+ Set<DataSegment> segments,
+ TaskLockType taskLockType
+ )
+ {
+ if (taskLockType.equals(TaskLockType.APPEND)) {
+ return SegmentTransactionalAppendAction.forSegments(segments);
+ } else if (taskLockType.equals(TaskLockType.SHARED)) {
+ return SegmentTransactionalInsertAction.appendAction(segments, null,
null);
+ } else {
+ throw DruidException.defensive("Invalid lock type %s received for append
action", taskLockType);
+ }
+ }
+
+ private TaskAction<SegmentPublishResult> createOverwriteAction(
+ TaskLockType taskLockType,
+ Set<DataSegment> segmentsWithTombstones
+ )
+ {
+ if (taskLockType.equals(TaskLockType.REPLACE)) {
+ return SegmentTransactionalReplaceAction.create(segmentsWithTombstones);
+ } else if (taskLockType.equals(TaskLockType.EXCLUSIVE)) {
+ return SegmentTransactionalInsertAction.overwriteAction(null,
segmentsWithTombstones);
+ } else {
+ throw DruidException.defensive("Invalid lock type %s received for
overwrite action", taskLockType);
Review Comment:
```suggestion
throw DruidException.defensive("Invalid lock type [%s] received for
overwrite action", taskLockType);
```
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java:
##########
@@ -374,4 +378,47 @@ public void testTopLevelUnionAllWithJoins()
"SQL requires union between inputs that are not simple table
scans and involve a filter or aliasing"))
.verifyPlanningErrors();
}
+
+ @Test
+ public void testInsertWithReplaceAndExcludeLocks()
+ {
+ for (TaskLockType taskLockType : new
TaskLockType[]{TaskLockType.EXCLUSIVE, TaskLockType.REPLACE}) {
+ testLockTypes(
+ taskLockType,
+ "insert into foo1 select * from foo partitioned by day",
Review Comment:
nit: Better capitalization in the test cases.
```suggestion
"INSERT INTO foo1 SELECT * FROM foo PARTITIONED BY DAY",
```
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -47,21 +50,34 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@RunWith(Parameterized.class)
public class MSQReplaceTest extends MSQTestBase
{
+
+ private static final String WITH_REPLACE_LOCK = "WITH_REPLACE_LOCK";
+ private static final Map<String, Object> QUERY_CONTEXT_WITH_REPLACE_LOCK =
+ ImmutableMap.<String, Object>builder()
+ .putAll(DEFAULT_MSQ_CONTEXT)
+ .put(
+ Tasks.TASK_LOCK_TYPE,
+ TaskLockType.REPLACE.name().toLowerCase(Locale.ENGLISH)
Review Comment:
```suggestion
StringUtils.toLowerCase(TaskLockType.REPLACE.name())
```
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java:
##########
@@ -350,4 +359,53 @@ static IndexSpec decodeIndexSpec(@Nullable final Object
indexSpecObject, final O
throw QueryContexts.badValueException(CTX_INDEX_SPEC, "an indexSpec",
indexSpecObject);
}
}
+
+ /**
+ * This method is used to validate and get the taskLockType from the
queryContext.
+ * If the queryContext does not contain the taskLockType, then {@link
TaskLockType#EXCLUSIVE} is used for replace queries and
+ * {@link TaskLockType#SHARED} is used for insert queries.
+ * If the queryContext contains the taskLockType, then it is validated and
returned.
+ */
+ public static TaskLockType validateAndGetTaskLockType(QueryContext
queryContext, boolean isReplaceQuery)
+ {
+ final TaskLockType taskLockType = QueryContexts.getAsEnum(
+ Tasks.TASK_LOCK_TYPE,
+ queryContext.getString(Tasks.TASK_LOCK_TYPE, null),
+ TaskLockType.class
+ );
+ if (taskLockType == null) {
+ if (isReplaceQuery) {
+ return TaskLockType.EXCLUSIVE;
+ } else {
+ return TaskLockType.APPEND;
+ }
+ }
+ final String appendErrorMessage = StringUtils.format(
+ "Please use context parameter with key %s to set the taskLockType or "
+ + "remove this key for automatic lock type selection",
Tasks.TASK_LOCK_TYPE);
+
+ if (isReplaceQuery && !(taskLockType.equals(TaskLockType.EXCLUSIVE) ||
taskLockType.equals(TaskLockType.REPLACE))) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ "TaskLock must be of type %s or %s for a replace
query. Found type %s set."
Review Comment:
capitalization and extrapolation
```suggestion
"TaskLock must be of type %s or %s for a
REPLACE query. Invalid lock type [%s] found."
```
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java:
##########
@@ -350,4 +359,53 @@ static IndexSpec decodeIndexSpec(@Nullable final Object
indexSpecObject, final O
throw QueryContexts.badValueException(CTX_INDEX_SPEC, "an indexSpec",
indexSpecObject);
}
}
+
+ /**
+ * This method is used to validate and get the taskLockType from the
queryContext.
+ * If the queryContext does not contain the taskLockType, then {@link
TaskLockType#EXCLUSIVE} is used for replace queries and
+ * {@link TaskLockType#SHARED} is used for insert queries.
+ * If the queryContext contains the taskLockType, then it is validated and
returned.
+ */
+ public static TaskLockType validateAndGetTaskLockType(QueryContext
queryContext, boolean isReplaceQuery)
+ {
+ final TaskLockType taskLockType = QueryContexts.getAsEnum(
+ Tasks.TASK_LOCK_TYPE,
+ queryContext.getString(Tasks.TASK_LOCK_TYPE, null),
+ TaskLockType.class
+ );
+ if (taskLockType == null) {
+ if (isReplaceQuery) {
+ return TaskLockType.EXCLUSIVE;
+ } else {
+ return TaskLockType.APPEND;
+ }
+ }
+ final String appendErrorMessage = StringUtils.format(
+ "Please use context parameter with key %s to set the taskLockType or "
+ + "remove this key for automatic lock type selection",
Tasks.TASK_LOCK_TYPE);
+
+ if (isReplaceQuery && !(taskLockType.equals(TaskLockType.EXCLUSIVE) ||
taskLockType.equals(TaskLockType.REPLACE))) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ "TaskLock must be of type %s or %s for a replace
query. Found type %s set."
+ + appendErrorMessage,
+ TaskLockType.EXCLUSIVE,
+ TaskLockType.REPLACE,
+ taskLockType
+ );
+ }
+ if (!isReplaceQuery && !(taskLockType.equals(TaskLockType.SHARED) ||
taskLockType.equals(TaskLockType.APPEND))) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ "TaskLock must be of type %s or %s for an insert
query. Found type %s set."
Review Comment:
```suggestion
"TaskLock must be of type %s or %s for an
INSERT query. Invalid lock type [%s] found."
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]