abhishekagarwal87 commented on code in PR #14887:
URL: https://github.com/apache/druid/pull/14887#discussion_r1334148644
##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -119,11 +119,15 @@ protected KubernetesPeonLifecycle(
* @return
* @throws IllegalStateException
*/
- protected synchronized TaskStatus run(Job job, long launchTimeout, long
timeout) throws IllegalStateException
+ protected synchronized TaskStatus run(Job job, long launchTimeout, long
timeout, Boolean useDeepStorageForTaskPayload) throws IllegalStateException
Review Comment:
it seems inconsistent that some arguments are primitives (long) and some are
objects (Boolean). Whats the reason?
##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -144,6 +148,28 @@ protected synchronized TaskStatus run(Job job, long
launchTimeout, long timeout)
}
}
+ private void writeTaskPayload(Task task)
+ {
+ try {
Review Comment:
nit - you could have avoided nested tries and handling exception twice by
just declaring a null file object and deleting the file only when the said
object is not null.
##########
processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tasklogs;
+
+import com.google.common.base.Optional;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Something that knows how to manage task json paylods.
Review Comment:
The javadocs can be more descriptive. Like how it is used, where it is used
etc. "manage task json payloads" is very broad description. It's only used in
k8s task management.
##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java:
##########
@@ -132,11 +140,36 @@ public Task toTask(Job from) throws IOException
Optional<EnvVar> taskJson = envVars.stream().filter(x ->
"TASK_JSON".equals(x.getName())).findFirst();
String contents = taskJson.map(envVar ->
taskJson.get().getValue()).orElse(null);
if (contents == null) {
- throw new IOException("No TASK_JSON environment variable found in pod: "
+ from.getMetadata().getName());
+ log.info("No TASK_JSON environment variable found in pod: %s. Trying to
load task payload from deep storage.", from.getMetadata().getName());
+ return toTaskUsingDeepStorage(from);
}
return mapper.readValue(Base64Compression.decompressBase64(contents),
Task.class);
}
+ private Task toTaskUsingDeepStorage(Job from) throws IOException
+ {
+ com.google.common.base.Optional<InputStream> taskBody =
taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId());
+ if (!taskBody.isPresent()) {
+ throw new IOE("Could not load task payload for job [%s]",
from.getMetadata().getName());
Review Comment:
we should start using DruidException classes instead of IOE.
##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java:
##########
@@ -89,14 +94,16 @@ public abstract class K8sTaskAdapter implements TaskAdapter
protected final StartupLoggingConfig startupLoggingConfig;
protected final DruidNode node;
protected final ObjectMapper mapper;
+ protected final TaskLogs taskLogs;
public K8sTaskAdapter(
KubernetesClientApi client,
KubernetesTaskRunnerConfig taskRunnerConfig,
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode node,
- ObjectMapper mapper
+ ObjectMapper mapper,
+ TaskLogs taskLogs
Review Comment:
the name of this class has been confusing since it does so much more than
dealing with task logs. could be fixed in some other PR someday.
##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -144,6 +148,28 @@ protected synchronized TaskStatus run(Job job, long
launchTimeout, long timeout)
}
}
+ private void writeTaskPayload(Task task)
+ {
+ try {
+ Path file = Files.createTempFile(taskId.getOriginalTaskId(),
"task.json");
+ try {
+ FileUtils.writeStringToFile(file.toFile(),
mapper.writeValueAsString(task), Charset.defaultCharset());
+ taskLogs.pushTaskPayload(task.getId(), file.toFile());
Review Comment:
is it possible that a log cleanup job removes the task payload from the deep
storage while task is still in progress? How are these payloads cleaned up from
deep storage?
##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java:
##########
@@ -433,5 +475,18 @@ static ResourceRequirements
getResourceRequirements(ResourceRequirements require
}
return requirements;
}
+
+ @Override
+ public Boolean shouldUseDeepStorageForTaskPayload(Task task)
+ {
+ try {
+ String compressedTaskPayload =
Base64Compression.compressBase64(mapper.writeValueAsString(task));
+ return compressedTaskPayload.length() >
DruidK8sConstants.MAX_ENV_VARIABLE_KBS;
+ }
+ catch (Exception e) {
Review Comment:
is any of the call in the method actually throwing an exception here? The
task should be serializable. If compiler is not forcing you to catch Exception,
I would suggest you avoid doing it. At least avoid it here, let it bubble up
and then handle at the top of the call stack. Otherwise, your call will be
littered with unnecessary catch blocks.
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java:
##########
@@ -83,6 +83,21 @@ public Optional<InputStream> streamTaskStatus(String taskid)
throws IOException
return streamTaskFileWithRetry(0, taskKey);
}
+ @Override
+ public void pushTaskPayload(String taskid, File taskPayloadFile) throws
IOException
+ {
+ final String taskKey = getTaskLogKey(taskid, "task.json");
+ log.info("Pushing task payload %s to: %s", taskPayloadFile, taskKey);
Review Comment:
```suggestion
log.info("Pushing task payload [%s] to location [%s]", taskPayloadFile,
taskKey);
```
##########
distribution/docker/peon.sh:
##########
@@ -150,6 +150,6 @@ then
fi
# take the ${TASK_JSON} environment variable and base64 decode, unzip and
throw it in ${TASK_DIR}/task.json
Review Comment:
the comment needs an update?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]