kfaraz commented on code in PR #16512:
URL: https://github.com/apache/druid/pull/16512#discussion_r1618953072
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -509,6 +509,23 @@ public boolean add(final Task task)
throw EntryAlreadyExists.exception("Task[%s] already exists",
task.getId());
}
+ String payload = null;
+ try {
+ payload = passwordRedactingMapper.writeValueAsString(task);
+ }
+ catch (JsonProcessingException ignored) {
Review Comment:
throw a `DruidException.defensive` in this catch.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -509,6 +509,22 @@ public boolean add(final Task task)
throw EntryAlreadyExists.exception("Task[%s] already exists",
task.getId());
}
+ String payload = null;
Review Comment:
Move the whole new logic to a separate private method
`validateTaskPayload()`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java:
##########
@@ -42,20 +42,27 @@ public class TaskQueueConfig
@JsonProperty
private int taskCompleteHandlerNumThreads;
+ @JsonProperty
+ private Long maxTaskPayloadSize;
+
@JsonCreator
public TaskQueueConfig(
@JsonProperty("maxSize") final Integer maxSize,
@JsonProperty("startDelay") final Period startDelay,
@JsonProperty("restartDelay") final Period restartDelay,
@JsonProperty("storageSyncRate") final Period storageSyncRate,
- @JsonProperty("taskCompleteHandlerNumThreads") final Integer
taskCompleteHandlerNumThreads
+ @JsonProperty("taskCompleteHandlerNumThreads") final Integer
taskCompleteHandlerNumThreads,
+ @JsonProperty("maxTaskPayloadSize") final Long maxTaskPayloadSize
Review Comment:
We should also annotate this with `@Nullable`.
##########
docs/configuration/index.md:
##########
@@ -1125,6 +1125,7 @@ These Overlord static configurations can be defined in
the `overlord/runtime.pro
|`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord
queue management. This can be useful to give a cluster time to re-orient itself
(for example, after a widespread network issue).|`PT1M`|
|`druid.indexer.queue.restartDelay`|Sleep this long when Overlord queue
management throws an exception before trying again.|`PT30S`|
|`druid.indexer.queue.storageSyncRate`|Sync Overlord state this often with an
underlying task persistence mechanism.|`PT1M`|
+|`druid.indexer.queue.maxTaskPayloadSize`|Maximum size in bytes of a single
task payload.|`62914560`|
Review Comment:
```suggestion
|`druid.indexer.queue.maxTaskPayloadBytes`|Maximum allow size of a single
task payload in bytes.|`62914560` (64 MiB) |
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -509,6 +509,23 @@ public boolean add(final Task task)
throw EntryAlreadyExists.exception("Task[%s] already exists",
task.getId());
}
+ String payload = null;
+ try {
+ payload = passwordRedactingMapper.writeValueAsString(task);
+ }
+ catch (JsonProcessingException ignored) {
+ }
+ if (payload != null && payload.length() > config.getMaxTaskPayloadSize()) {
Review Comment:
Nit: This `if` check should be inside the try. Then we can avoid the null
check and the scope of `payload` can be limited to the try block.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java:
##########
@@ -42,20 +42,27 @@ public class TaskQueueConfig
@JsonProperty
private int taskCompleteHandlerNumThreads;
+ @JsonProperty
+ private Long maxTaskPayloadSize;
Review Comment:
Alternatively, do you think it would be better to have this config in human
readable bytes?
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java:
##########
@@ -42,20 +42,27 @@ public class TaskQueueConfig
@JsonProperty
private int taskCompleteHandlerNumThreads;
+ @JsonProperty
+ private Long maxTaskPayloadSize;
Review Comment:
We need not used a boxed value as we are assigning a default value anyway.
The getter can also be updated to return a primitive `long`.
```suggestion
private final long maxTaskPayloadSize;
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -509,6 +509,22 @@ public boolean add(final Task task)
throw EntryAlreadyExists.exception("Task[%s] already exists",
task.getId());
}
+ String payload = null;
+ try {
+ payload = passwordRedactingMapper.writeValueAsString(task);
+ }
+ catch (JsonProcessingException ignored) {
+ }
+ if (payload != null && payload.length() > config.getMaxTaskPayloadSize()) {
+ log.warn("Received a task payload > [%d] with id [%s]. and datasource
[%s]" +
+ " There may be downstream issues caused by managing this large
payload." +
+ "Increase druid.indexer.queue.maxTaskPayloadSize to ignore this
warning.",
+ config.getMaxTaskPayloadSize(),
+ task.getId(),
+ task.getDataSource()
+ );
Review Comment:
```suggestion
log.warn(
"Task[%s] of datasource[%s] has payload size[%d] larger than
the recommended maximum[%d]." +
+ " Large task payloads may cause stability issues in the
Overlord and may fail while persisting to the metadata store."
+ " Use smaller task payloads or increase
'druid.indexer.queue.maxTaskPayloadSize' to suppress this warning.",
task.getId(), task.getDataSource(), config.getMaxTaskPayloadSize()
);
```
##########
docs/configuration/index.md:
##########
@@ -1125,6 +1125,7 @@ These Overlord static configurations can be defined in
the `overlord/runtime.pro
|`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord
queue management. This can be useful to give a cluster time to re-orient itself
(for example, after a widespread network issue).|`PT1M`|
|`druid.indexer.queue.restartDelay`|Sleep this long when Overlord queue
management throws an exception before trying again.|`PT30S`|
|`druid.indexer.queue.storageSyncRate`|Sync Overlord state this often with an
underlying task persistence mechanism.|`PT1M`|
+|`druid.indexer.queue.maxTaskPayloadSize`|Maximum size in bytes of a single
task payload.|`62914560`|
Review Comment:
If we decide to specify the config in human readable bytes, then we can
retain the name `maxTaskPayloadSize`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]