LakshSingla commented on code in PR #15168:
URL: https://github.com/apache/druid/pull/15168#discussion_r1360883310


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1467,6 +1480,34 @@ private void publishAllSegments(final Set<DataSegment> 
segments) throws IOExcept
     task.emitMetric(context.emitter(), "ingest/segments/count", 
segmentsWithTombstones.size());
   }
 
+  private static TaskAction<SegmentPublishResult> createAppendAction(
+      Set<DataSegment> segments,
+      TaskLockType taskLockType
+  )
+  {
+    if (taskLockType.equals(TaskLockType.APPEND)) {
+      return SegmentTransactionalAppendAction.forSegments(segments);
+    } else if (taskLockType.equals(TaskLockType.SHARED)) {
+      return SegmentTransactionalInsertAction.appendAction(segments, null, 
null);
+    } else {
+      throw DruidException.defensive("Invalid lock type %s received for append 
action", taskLockType);

Review Comment:
   nit: extrapolation
   ```suggestion
         throw DruidException.defensive("Invalid lock type [%s] received for 
append action", taskLockType);
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1467,6 +1480,34 @@ private void publishAllSegments(final Set<DataSegment> 
segments) throws IOExcept
     task.emitMetric(context.emitter(), "ingest/segments/count", 
segmentsWithTombstones.size());
   }
 
+  private static TaskAction<SegmentPublishResult> createAppendAction(
+      Set<DataSegment> segments,
+      TaskLockType taskLockType
+  )
+  {
+    if (taskLockType.equals(TaskLockType.APPEND)) {
+      return SegmentTransactionalAppendAction.forSegments(segments);
+    } else if (taskLockType.equals(TaskLockType.SHARED)) {
+      return SegmentTransactionalInsertAction.appendAction(segments, null, 
null);
+    } else {
+      throw DruidException.defensive("Invalid lock type %s received for append 
action", taskLockType);
+    }
+  }
+
+  private TaskAction<SegmentPublishResult> createOverwriteAction(
+      TaskLockType taskLockType,
+      Set<DataSegment> segmentsWithTombstones
+  )
+  {
+    if (taskLockType.equals(TaskLockType.REPLACE)) {
+      return SegmentTransactionalReplaceAction.create(segmentsWithTombstones);
+    } else if (taskLockType.equals(TaskLockType.EXCLUSIVE)) {
+      return SegmentTransactionalInsertAction.overwriteAction(null, 
segmentsWithTombstones);
+    } else {
+      throw DruidException.defensive("Invalid lock type %s received for 
overwrite action", taskLockType);

Review Comment:
   ```suggestion
         throw DruidException.defensive("Invalid lock type [%s] received for 
overwrite action", taskLockType);
   ```



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java:
##########
@@ -374,4 +378,47 @@ public void testTopLevelUnionAllWithJoins()
                 "SQL requires union between inputs that are not simple table 
scans and involve a filter or aliasing"))
         .verifyPlanningErrors();
   }
+
+  @Test
+  public void testInsertWithReplaceAndExcludeLocks()
+  {
+    for (TaskLockType taskLockType : new 
TaskLockType[]{TaskLockType.EXCLUSIVE, TaskLockType.REPLACE}) {
+      testLockTypes(
+          taskLockType,
+          "insert into foo1 select * from foo partitioned by day",

Review Comment:
   nit: Better capitalization in the test cases. 
   ```suggestion
             "INSERT INTO foo1 SELECT * FROM foo PARTITIONED BY DAY",
   ```



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -47,21 +50,34 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
 @RunWith(Parameterized.class)
 public class MSQReplaceTest extends MSQTestBase
 {
+
+  private static final String WITH_REPLACE_LOCK = "WITH_REPLACE_LOCK";
+  private static final Map<String, Object> QUERY_CONTEXT_WITH_REPLACE_LOCK =
+      ImmutableMap.<String, Object>builder()
+                  .putAll(DEFAULT_MSQ_CONTEXT)
+                  .put(
+                      Tasks.TASK_LOCK_TYPE,
+                      TaskLockType.REPLACE.name().toLowerCase(Locale.ENGLISH)

Review Comment:
   ```suggestion
                         StringUtils.toLowerCase(TaskLockType.REPLACE.name())
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java:
##########
@@ -350,4 +359,53 @@ static IndexSpec decodeIndexSpec(@Nullable final Object 
indexSpecObject, final O
       throw QueryContexts.badValueException(CTX_INDEX_SPEC, "an indexSpec", 
indexSpecObject);
     }
   }
+
+  /**
+   * This method is used to validate and get the taskLockType from the 
queryContext.
+   * If the queryContext does not contain the taskLockType, then {@link 
TaskLockType#EXCLUSIVE} is used for replace queries and
+   * {@link TaskLockType#SHARED} is used for insert queries.
+   * If the queryContext contains the taskLockType, then it is validated and 
returned.
+   */
+  public static TaskLockType validateAndGetTaskLockType(QueryContext 
queryContext, boolean isReplaceQuery)
+  {
+    final TaskLockType taskLockType = QueryContexts.getAsEnum(
+        Tasks.TASK_LOCK_TYPE,
+        queryContext.getString(Tasks.TASK_LOCK_TYPE, null),
+        TaskLockType.class
+    );
+    if (taskLockType == null) {
+      if (isReplaceQuery) {
+        return TaskLockType.EXCLUSIVE;
+      } else {
+        return TaskLockType.APPEND;
+      }
+    }
+    final String appendErrorMessage = StringUtils.format(
+        "Please use context parameter with key %s to set the taskLockType or "
+        + "remove this key for automatic lock type selection", 
Tasks.TASK_LOCK_TYPE);
+
+    if (isReplaceQuery && !(taskLockType.equals(TaskLockType.EXCLUSIVE) || 
taskLockType.equals(TaskLockType.REPLACE))) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build(
+                              "TaskLock must be of type %s or %s for a replace 
query. Found type %s set."

Review Comment:
   capitalization and extrapolation
   ```suggestion
                                 "TaskLock must be of type %s or %s for a 
REPLACE query. Invalid lock type [%s] found."
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java:
##########
@@ -350,4 +359,53 @@ static IndexSpec decodeIndexSpec(@Nullable final Object 
indexSpecObject, final O
       throw QueryContexts.badValueException(CTX_INDEX_SPEC, "an indexSpec", 
indexSpecObject);
     }
   }
+
+  /**
+   * This method is used to validate and get the taskLockType from the 
queryContext.
+   * If the queryContext does not contain the taskLockType, then {@link 
TaskLockType#EXCLUSIVE} is used for replace queries and
+   * {@link TaskLockType#SHARED} is used for insert queries.
+   * If the queryContext contains the taskLockType, then it is validated and 
returned.
+   */
+  public static TaskLockType validateAndGetTaskLockType(QueryContext 
queryContext, boolean isReplaceQuery)
+  {
+    final TaskLockType taskLockType = QueryContexts.getAsEnum(
+        Tasks.TASK_LOCK_TYPE,
+        queryContext.getString(Tasks.TASK_LOCK_TYPE, null),
+        TaskLockType.class
+    );
+    if (taskLockType == null) {
+      if (isReplaceQuery) {
+        return TaskLockType.EXCLUSIVE;
+      } else {
+        return TaskLockType.APPEND;
+      }
+    }
+    final String appendErrorMessage = StringUtils.format(
+        "Please use context parameter with key %s to set the taskLockType or "
+        + "remove this key for automatic lock type selection", 
Tasks.TASK_LOCK_TYPE);
+
+    if (isReplaceQuery && !(taskLockType.equals(TaskLockType.EXCLUSIVE) || 
taskLockType.equals(TaskLockType.REPLACE))) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build(
+                              "TaskLock must be of type %s or %s for a replace 
query. Found type %s set."
+                              + appendErrorMessage,
+                              TaskLockType.EXCLUSIVE,
+                              TaskLockType.REPLACE,
+                              taskLockType
+                          );
+    }
+    if (!isReplaceQuery && !(taskLockType.equals(TaskLockType.SHARED) || 
taskLockType.equals(TaskLockType.APPEND))) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build(
+                              "TaskLock must be of type %s or %s for an insert 
query. Found type %s set."

Review Comment:
   ```suggestion
                                 "TaskLock must be of type %s or %s for an 
INSERT query. Invalid lock type [%s] found."
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to