kfaraz commented on code in PR #16512:
URL: https://github.com/apache/druid/pull/16512#discussion_r1618953072


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -509,6 +509,23 @@ public boolean add(final Task task)
       throw EntryAlreadyExists.exception("Task[%s] already exists", 
task.getId());
     }
 
+    String payload = null;
+    try {
+      payload = passwordRedactingMapper.writeValueAsString(task);
+    }
+    catch (JsonProcessingException ignored) {

Review Comment:
   throw a `DruidException.defensive` in this catch.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -509,6 +509,22 @@ public boolean add(final Task task)
       throw EntryAlreadyExists.exception("Task[%s] already exists", 
task.getId());
     }
 
+    String payload = null;

Review Comment:
   Move the whole new logic to a separate private method 
`validateTaskPayload()`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java:
##########
@@ -42,20 +42,27 @@ public class TaskQueueConfig
   @JsonProperty
   private int taskCompleteHandlerNumThreads;
 
+  @JsonProperty
+  private Long maxTaskPayloadSize;
+
   @JsonCreator
   public TaskQueueConfig(
       @JsonProperty("maxSize") final Integer maxSize,
       @JsonProperty("startDelay") final Period startDelay,
       @JsonProperty("restartDelay") final Period restartDelay,
       @JsonProperty("storageSyncRate") final Period storageSyncRate,
-      @JsonProperty("taskCompleteHandlerNumThreads") final Integer 
taskCompleteHandlerNumThreads
+      @JsonProperty("taskCompleteHandlerNumThreads") final Integer 
taskCompleteHandlerNumThreads,
+      @JsonProperty("maxTaskPayloadSize") final Long maxTaskPayloadSize

Review Comment:
   We should also annotate this with `@Nullable`.



##########
docs/configuration/index.md:
##########
@@ -1125,6 +1125,7 @@ These Overlord static configurations can be defined in 
the `overlord/runtime.pro
 |`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord 
queue management. This can be useful to give a cluster time to re-orient itself 
(for example, after a widespread network issue).|`PT1M`|
 |`druid.indexer.queue.restartDelay`|Sleep this long when Overlord queue 
management throws an exception before trying again.|`PT30S`|
 |`druid.indexer.queue.storageSyncRate`|Sync Overlord state this often with an 
underlying task persistence mechanism.|`PT1M`|
+|`druid.indexer.queue.maxTaskPayloadSize`|Maximum size in bytes of a single 
task payload.|`62914560`|

Review Comment:
   ```suggestion
   |`druid.indexer.queue.maxTaskPayloadBytes`|Maximum allow size of a single 
task payload in bytes.|`62914560` (64 MiB) |
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -509,6 +509,23 @@ public boolean add(final Task task)
       throw EntryAlreadyExists.exception("Task[%s] already exists", 
task.getId());
     }
 
+    String payload = null;
+    try {
+      payload = passwordRedactingMapper.writeValueAsString(task);
+    }
+    catch (JsonProcessingException ignored) {
+    }
+    if (payload != null && payload.length() > config.getMaxTaskPayloadSize()) {

Review Comment:
   Nit: This `if` check should be inside the try. Then we can avoid the null 
check and the scope of `payload` can be limited to the try block.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java:
##########
@@ -42,20 +42,27 @@ public class TaskQueueConfig
   @JsonProperty
   private int taskCompleteHandlerNumThreads;
 
+  @JsonProperty
+  private Long maxTaskPayloadSize;

Review Comment:
   Alternatively, do you think it would be better to have this config in human 
readable bytes?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java:
##########
@@ -42,20 +42,27 @@ public class TaskQueueConfig
   @JsonProperty
   private int taskCompleteHandlerNumThreads;
 
+  @JsonProperty
+  private Long maxTaskPayloadSize;

Review Comment:
   We need not used a boxed value as we are assigning a default value anyway. 
The getter can also be updated to return a primitive `long`.
   ```suggestion
     private final long  maxTaskPayloadSize;
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -509,6 +509,22 @@ public boolean add(final Task task)
       throw EntryAlreadyExists.exception("Task[%s] already exists", 
task.getId());
     }
 
+    String payload = null;
+    try {
+      payload = passwordRedactingMapper.writeValueAsString(task);
+    }
+    catch (JsonProcessingException ignored) {
+    }
+    if (payload != null && payload.length() > config.getMaxTaskPayloadSize()) {
+      log.warn("Received a task payload > [%d] with id [%s]. and datasource 
[%s]" +
+              " There may be downstream issues caused by managing this large 
payload." +
+              "Increase druid.indexer.queue.maxTaskPayloadSize to ignore this 
warning.",
+          config.getMaxTaskPayloadSize(),
+          task.getId(),
+          task.getDataSource()
+      );

Review Comment:
   ```suggestion
         log.warn(
                "Task[%s] of datasource[%s] has payload size[%d] larger than 
the recommended maximum[%d]." +
                 + " Large task payloads may cause stability issues in the 
Overlord and may fail while persisting to the metadata store."
                 + " Use smaller task payloads or increase 
'druid.indexer.queue.maxTaskPayloadSize' to suppress this warning.",
             task.getId(), task.getDataSource(), config.getMaxTaskPayloadSize()
         );
   ```



##########
docs/configuration/index.md:
##########
@@ -1125,6 +1125,7 @@ These Overlord static configurations can be defined in 
the `overlord/runtime.pro
 |`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord 
queue management. This can be useful to give a cluster time to re-orient itself 
(for example, after a widespread network issue).|`PT1M`|
 |`druid.indexer.queue.restartDelay`|Sleep this long when Overlord queue 
management throws an exception before trying again.|`PT30S`|
 |`druid.indexer.queue.storageSyncRate`|Sync Overlord state this often with an 
underlying task persistence mechanism.|`PT1M`|
+|`druid.indexer.queue.maxTaskPayloadSize`|Maximum size in bytes of a single 
task payload.|`62914560`|

Review Comment:
   If we decide to specify the config in human readable bytes, then we can 
retain the name `maxTaskPayloadSize`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to