LakshSingla commented on code in PR #13353:
URL: https://github.com/apache/druid/pull/13353#discussion_r1022617563
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java:
##########
@@ -58,4 +58,14 @@
* Maximum size of the kernel manipulation queue in {@link
org.apache.druid.msq.indexing.MSQControllerTask}.
*/
public static final int MAX_KERNEL_MANIPULATION_QUEUE_SIZE = 100_000;
+
+ /**
+ * Maximum retries across all workers.
+ */
+ public static final int TOTAL_RETRY_LIMIT = 30;
Review Comment:
Should this be related to the number of workers running the query instead of
being a fixed constant? A server with a large number of task slots might shut
down temporarily, but restarting the tasks on it would consume a huge chunk of
the limit.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.error;
+
+public class MSQFaultUtils
+{
+
+ public static final String ERROR_CODE_DELIMITER = ": ";
+
+ /**
+ * Generate string message with error code delimited by {@link
MSQFaultUtils#ERROR_CODE_DELIMITER}
+ */
+ public static String generateMessageWithErrorCode(MSQFault msqFault)
+ {
+ final String message = msqFault.getErrorMessage();
+
+ if (message != null && !message.isEmpty()) {
+ return msqFault.getErrorCode() + ERROR_CODE_DELIMITER + message;
+ } else {
+ return msqFault.getErrorCode();
+ }
+ }
+
+ /**
+ * Gets the error code from the message. If the messay is empty or null,
{@link UnknownFault#CODE} is returned. This method
+ * does not gurantee that the error code we get out of the message is a
valid error code.
+ */
+ public static String getErrorCodeFromMessage(String message)
Review Comment:
Instead of relying on the manual parsing of the error codes from the message
string and defining our own error message schema, can we use jsonMapper somehow
to do it for us?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.error;
+
+public class MSQFaultUtils
+{
+
+ public static final String ERROR_CODE_DELIMITER = ": ";
+
+ /**
+ * Generate string message with error code delimited by {@link
MSQFaultUtils#ERROR_CODE_DELIMITER}
+ */
+ public static String generateMessageWithErrorCode(MSQFault msqFault)
+ {
+ final String message = msqFault.getErrorMessage();
+
+ if (message != null && !message.isEmpty()) {
+ return msqFault.getErrorCode() + ERROR_CODE_DELIMITER + message;
+ } else {
+ return msqFault.getErrorCode();
+ }
+ }
+
+ /**
+ * Gets the error code from the message. If the messay is empty or null,
{@link UnknownFault#CODE} is returned. This method
+ * does not gurantee that the error code we get out of the message is a
valid error code.
+ */
+ public static String getErrorCodeFromMessage(String message)
+ {
+ if (message == null || message.isEmpty()) {
+ return UnknownFault.CODE;
+ }
+ return message.split(ERROR_CODE_DELIMITER, 2)[0];
Review Comment:
A check that this doesn't go out of bounds (in case
generateMessageWithErrorCode wasn't invoked at a point in the future) would be
helpful (we can return UnknownFault.CODE in that case).
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -546,4 +614,65 @@ private static Map<StageId, Set<StageId>>
computeStageOutflowMap(final QueryDefi
return retVal;
}
+
+ /**
+ * Checks the {@link MSQFault#getErrorCode()} is eligible for retry.
+ * <br/>
+ * If yes, transitions the stage to{@link ControllerStagePhase#RETRYING} and
returns all the {@link WorkOrder}
+ * <br/>
+ * else throw {@link MSQException}
+ *
+ * @param workerNumber
+ * @param msqFault
+ * @return List of {@link WorkOrder} that needs to be retried.
+ */
+ public List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int
workerNumber, MSQFault msqFault)
Review Comment:
nit:
1. Spelling
```suggestion
public List<WorkOrder> getWorkInCaseWorkerEligibleForRetryElseThrow(int
workerNumber, MSQFault msqFault)
```
2. This method name seems unwieldy. Can we drop the elseThrow part,
something like `getWorkFromRetriableWorkers` seems cleaner to me. Opinions?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TotalRelaunchLimitExceededFault.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.error;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Objects;
+
+@JsonTypeName(TotalRelaunchLimitExceededFault.CODE)
+public class TotalRelaunchLimitExceededFault extends BaseMSQFault
Review Comment:
```suggestion
public class TotalRetryLimitExceededFault extends BaseMSQFault
```
Should we change relaunch to retry everywhere in this fault class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]