abhishekagarwal87 commented on code in PR #14887:
URL: https://github.com/apache/druid/pull/14887#discussion_r1334148644


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -119,11 +119,15 @@ protected KubernetesPeonLifecycle(
    * @return
    * @throws IllegalStateException
    */
-  protected synchronized TaskStatus run(Job job, long launchTimeout, long 
timeout) throws IllegalStateException
+  protected synchronized TaskStatus run(Job job, long launchTimeout, long 
timeout, Boolean useDeepStorageForTaskPayload) throws IllegalStateException

Review Comment:
   it seems inconsistent that some arguments are primitives (long) and some are 
objects (Boolean). Whats the reason? 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -144,6 +148,28 @@ protected synchronized TaskStatus run(Job job, long 
launchTimeout, long timeout)
     }
   }
 
+  private void writeTaskPayload(Task task)
+  {
+    try {

Review Comment:
   nit - you could have avoided nested tries and handling exception twice by 
just declaring a null file object and deleting the file only when the said 
object is not null. 



##########
processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tasklogs;
+
+import com.google.common.base.Optional;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Something that knows how to manage task json paylods.

Review Comment:
   The javadocs can be more descriptive. Like how it is used, where it is used 
etc. "manage task json payloads" is very broad description. It's only used in 
k8s task management. 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java:
##########
@@ -132,11 +140,36 @@ public Task toTask(Job from) throws IOException
     Optional<EnvVar> taskJson = envVars.stream().filter(x -> 
"TASK_JSON".equals(x.getName())).findFirst();
     String contents = taskJson.map(envVar -> 
taskJson.get().getValue()).orElse(null);
     if (contents == null) {
-      throw new IOException("No TASK_JSON environment variable found in pod: " 
+ from.getMetadata().getName());
+      log.info("No TASK_JSON environment variable found in pod: %s. Trying to 
load task payload from deep storage.", from.getMetadata().getName());
+      return toTaskUsingDeepStorage(from);
     }
     return mapper.readValue(Base64Compression.decompressBase64(contents), 
Task.class);
   }
 
+  private Task toTaskUsingDeepStorage(Job from) throws IOException
+  {
+    com.google.common.base.Optional<InputStream> taskBody = 
taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId());
+    if (!taskBody.isPresent()) {
+      throw new IOE("Could not load task payload for job [%s]", 
from.getMetadata().getName());

Review Comment:
   we should start using DruidException classes instead of IOE. 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java:
##########
@@ -89,14 +94,16 @@ public abstract class K8sTaskAdapter implements TaskAdapter
   protected final StartupLoggingConfig startupLoggingConfig;
   protected final DruidNode node;
   protected final ObjectMapper mapper;
+  protected final TaskLogs taskLogs;
 
   public K8sTaskAdapter(
       KubernetesClientApi client,
       KubernetesTaskRunnerConfig taskRunnerConfig,
       TaskConfig taskConfig,
       StartupLoggingConfig startupLoggingConfig,
       DruidNode node,
-      ObjectMapper mapper
+      ObjectMapper mapper,
+      TaskLogs taskLogs

Review Comment:
   the name of this class has been confusing since it does so much more than 
dealing with task logs. could be fixed in some other PR someday. 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -144,6 +148,28 @@ protected synchronized TaskStatus run(Job job, long 
launchTimeout, long timeout)
     }
   }
 
+  private void writeTaskPayload(Task task)
+  {
+    try {
+      Path file = Files.createTempFile(taskId.getOriginalTaskId(), 
"task.json");
+      try {
+        FileUtils.writeStringToFile(file.toFile(), 
mapper.writeValueAsString(task), Charset.defaultCharset());
+        taskLogs.pushTaskPayload(task.getId(), file.toFile());

Review Comment:
   is it possible that a log cleanup job removes the task payload from the deep 
storage while task is still in progress? How are these payloads cleaned up from 
deep storage? 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java:
##########
@@ -433,5 +475,18 @@ static ResourceRequirements 
getResourceRequirements(ResourceRequirements require
     }
     return requirements;
   }
+
+  @Override
+  public Boolean shouldUseDeepStorageForTaskPayload(Task task)
+  {
+    try {
+      String compressedTaskPayload = 
Base64Compression.compressBase64(mapper.writeValueAsString(task));
+      return compressedTaskPayload.length() > 
DruidK8sConstants.MAX_ENV_VARIABLE_KBS;
+    }
+    catch (Exception e) {

Review Comment:
   is any of the call in the method actually throwing an exception here? The 
task should be serializable. If compiler is not forcing you to catch Exception, 
I would suggest you avoid doing it. At least avoid it here, let it bubble up 
and then handle at the top of the call stack. Otherwise, your call will be 
littered with unnecessary catch blocks. 



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java:
##########
@@ -83,6 +83,21 @@ public Optional<InputStream> streamTaskStatus(String taskid) 
throws IOException
     return streamTaskFileWithRetry(0, taskKey);
   }
 
+  @Override
+  public void pushTaskPayload(String taskid, File taskPayloadFile) throws 
IOException
+  {
+    final String taskKey = getTaskLogKey(taskid, "task.json");
+    log.info("Pushing task payload %s to: %s", taskPayloadFile, taskKey);

Review Comment:
   ```suggestion
       log.info("Pushing task payload [%s] to location [%s]", taskPayloadFile, 
taskKey);
   ```



##########
distribution/docker/peon.sh:
##########
@@ -150,6 +150,6 @@ then
 fi
 
 # take the ${TASK_JSON} environment variable and base64 decode, unzip and 
throw it in ${TASK_DIR}/task.json

Review Comment:
   the comment needs an update? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to