kfaraz commented on code in PR #16512:
URL: https://github.com/apache/druid/pull/16512#discussion_r1622212798


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1018,4 +1022,37 @@ List<Task> getTasks()
       giant.unlock();
     }
   }
+
+  void validateTaskPayload(Task task)

Review Comment:
   Can this be private?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1018,4 +1022,37 @@ List<Task> getTasks()
       giant.unlock();
     }
   }
+
+  void validateTaskPayload(Task task)
+  {
+    try {
+      String payload = passwordRedactingMapper.writeValueAsString(task);
+      if (payload.length() > TASK_SIZE_WARNING_THRESHOLD) {
+        log.warn("Received a large task payload [%s] with id [%s] and 
datasource [%s]" +
+                " There may be downstream issues caused by managing this large 
payload." +
+                "Set druid.indexer.queue.maxTaskPayloadSize to reject tasks 
above a certain size.",
+            payload.length(),
+            task.getId(),
+            task.getDataSource()
+        );
+      }
+
+      if (config.getMaxTaskPayloadSize() != null && 
config.getMaxTaskPayloadSize().getBytesInInt() < payload.length()) {
+        throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+            .ofCategory(DruidException.Category.INVALID_INPUT)
+            .build(
+                "Task payload size was [%d] but max size is [%d]. " +
+                    "Reduce the size of the task or increase 
'druid.indexer.queue.maxTaskPayloadSize'.",
+                payload.length(),
+                config.getMaxTaskPayloadSize()
+            );
+      }
+    }
+    catch (JsonProcessingException e) {
+      throw DruidException.defensive(
+          "Failed to parse task payload for validation",
+          e
+      );

Review Comment:
   `.defensive()` doesn't take a cause. You can just log the original exception 
here and not include any root cause in the final `DruidException` as it is a 
defensive exception anyway.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1018,4 +1022,37 @@ List<Task> getTasks()
       giant.unlock();
     }
   }
+
+  void validateTaskPayload(Task task)
+  {
+    try {
+      String payload = passwordRedactingMapper.writeValueAsString(task);
+      if (payload.length() > TASK_SIZE_WARNING_THRESHOLD) {
+        log.warn("Received a large task payload [%s] with id [%s] and 
datasource [%s]" +
+                " There may be downstream issues caused by managing this large 
payload." +
+                "Set druid.indexer.queue.maxTaskPayloadSize to reject tasks 
above a certain size.",
+            payload.length(),
+            task.getId(),
+            task.getDataSource()
+        );
+      }
+
+      if (config.getMaxTaskPayloadSize() != null && 
config.getMaxTaskPayloadSize().getBytesInInt() < payload.length()) {
+        throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+            .ofCategory(DruidException.Category.INVALID_INPUT)
+            .build(
+                "Task payload size was [%d] but max size is [%d]. " +
+                    "Reduce the size of the task or increase 
'druid.indexer.queue.maxTaskPayloadSize'.",
+                payload.length(),
+                config.getMaxTaskPayloadSize()
+            );

Review Comment:
   Rephrased, used short-hand for invalid input exception, included task id in 
error message:
   
   ```suggestion
           throw InvalidInput.exception(
                   "Task[%s] has payload of size[%d] but max allowed size is 
[%d]. " +
                       "Reduce the size of the task payload or increase 
'druid.indexer.queue.maxTaskPayloadSize'.",
                   task.getId(), payload.length(), 
config.getMaxTaskPayloadSize()
               );
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1018,4 +1022,37 @@ List<Task> getTasks()
       giant.unlock();
     }
   }
+
+  void validateTaskPayload(Task task)
+  {
+    try {
+      String payload = passwordRedactingMapper.writeValueAsString(task);
+      if (payload.length() > TASK_SIZE_WARNING_THRESHOLD) {

Review Comment:
   this if should be in the `else` of the second if. i.e.
   
   ```
   if (maxAllowed != null && payloadSize > maxAllowed) {
       throw exception;
   } else if (payloadSize > warningSize) {
       log warning/raise alert
   }
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1018,4 +1022,37 @@ List<Task> getTasks()
       giant.unlock();
     }
   }
+
+  void validateTaskPayload(Task task)
+  {
+    try {
+      String payload = passwordRedactingMapper.writeValueAsString(task);
+      if (payload.length() > TASK_SIZE_WARNING_THRESHOLD) {
+        log.warn("Received a large task payload [%s] with id [%s] and 
datasource [%s]" +
+                " There may be downstream issues caused by managing this large 
payload." +

Review Comment:
   Please update the messaging based on the older suggestion.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -214,6 +216,73 @@ public void testAddThrowsExceptionWhenQueueIsFull()
     );
   }
 
+  @Test
+  public void testAddThrowsExceptionWhenPayloadIsTooLarge()
+  {
+    HumanReadableBytes maxPayloadSize = HumanReadableBytes.valueOf(10 * 1024 * 
1024);

Review Comment:
   ```suggestion
       HumanReadableBytes maxPayloadSize10Mib = new HumanReadableBytes("10MiB");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to