Radeity commented on code in PR #13194:
URL: 
https://github.com/apache/dolphinscheduler/pull/13194#discussion_r1049694760


##########
docs/docs/zh/faq.md:
##########
@@ -720,4 +720,18 @@ A:在 3.0.0-alpha 版本之后,Python gateway server 集成到 api server 
 
 ---
 
+## Q: 缓存执行时怎么判断任务已经存在缓存过的任务,即如何判断一个任务可以使用另外一个任务的运行结果?
+
+A: 对于标识为`缓存执行`的任务, 当任务启动时会生成一个缓存key, 该key由以下字段组合哈希得到:
+
+- 任务定义:任务实例对应的任务定义的id
+- 任务的版本:任务实例对应的任务定义的版本
+- 任务输入的参数:包括上游节点和全局参数传入的参数中,被任务定义的参数列表所引用和任务定义中使用`${}`引用的参数
+
+当缓存标识的任务运行时,会查找数据库中是否用相同缓存key的数据,
+- 若有则复制该任务实例并进行相应数据的更新
+- 若无,则任务照常运行,并在任务完成时将任务实例的数据存入缓存
+
+若不需要缓存时,可以在工作流实例中右键运行清除缓存,则会清楚该版本下当前输入的参数的缓存数据。

Review Comment:
   Typo error here, “清除”.



##########
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskCacheUtils {
+
+    private TaskCacheUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    public static final String MERGE_TAG = "-";
+
+    /**
+     * generate cache key for task instance
+     * the follow message will be used to generate cache key
+     * 2. task version
+     * 3. task is cache
+     * 4. input VarPool, from upstream task and workflow global parameters
+     * @param taskInstance task instance
+     * @param taskExecutionContext taskExecutionContext
+     * @return cache key
+     */
+    public static String generateCacheKey(TaskInstance taskInstance, 
TaskExecutionContext taskExecutionContext) {
+        List<String> keyElements = new ArrayList<>();
+        keyElements.add(String.valueOf(taskInstance.getTaskCode()));
+        
keyElements.add(String.valueOf(taskInstance.getTaskDefinitionVersion()));
+        keyElements.add(String.valueOf(taskInstance.getIsCache().getCode()));
+        keyElements.add(getTaskInputVarPoolData(taskInstance, 
taskExecutionContext));
+        String data = StringUtils.join(keyElements, "_");
+        return DigestUtils.sha256Hex(data);
+    }
+
+    /**
+     * generate cache key for task instance which is cache execute
+     * this key will record which cache task instance will be copied, and 
cache key will be used
+     * tagCacheKey = sourceTaskId + "-" + cacheKey
+     * @param sourceTaskId source task id
+     * @param cacheKey cache key
+     * @return tagCacheKey
+     */
+    public static String generateTagCacheKey(Integer sourceTaskId, String 
cacheKey) {
+        return sourceTaskId + MERGE_TAG + cacheKey;
+    }
+
+    /**
+     * revert cache key tag to source task id and cache key
+     * @param tagCacheKey cache key
+     * @return Pair<Integer, String>, first is source task id, second is cache 
key
+     */
+    public static Pair<Integer, String> revertCacheKey(String tagCacheKey) {
+        Pair<Integer, String> taskIdAndCacheKey;
+        if (tagCacheKey == null) {
+            taskIdAndCacheKey = Pair.of(-1, "");
+            return taskIdAndCacheKey;
+        }
+        if (tagCacheKey.contains(MERGE_TAG)) {
+            String[] split = tagCacheKey.split(MERGE_TAG);
+            if (split.length == 2) {
+                taskIdAndCacheKey = Pair.of(Integer.parseInt(split[0]), 
split[1]);
+            } else {
+                taskIdAndCacheKey = Pair.of(-1, "");
+            }
+            return taskIdAndCacheKey;
+        } else {
+            return Pair.of(-1, tagCacheKey);
+        }
+    }
+
+    /**
+     * get hash data of task input var pool
+     * there are two parts of task input var pool: from upstream task and 
workflow global parameters
+     * @param taskInstance task instance
+     * taskExecutionContext taskExecutionContext
+     */
+    public static String getTaskInputVarPoolData(TaskInstance taskInstance, 
TaskExecutionContext context) {
+        JsonNode taskParams = 
JSONUtils.parseObject(taskInstance.getTaskParams());
+
+        // The set of input values considered from localParams in the 
taskParams
+        Set<String> propertyInSet = 
JSONUtils.toList(taskParams.get("localParams").toString(), 
Property.class).stream()
+                .filter(property -> property.getDirect().equals(Direct.IN))
+                .map(Property::getProp).collect(Collectors.toSet());
+
+        // The set of input values considered from `${var}` form task 
definition
+        propertyInSet.addAll(getScriptVarInSet(taskInstance));

Review Comment:
   Should we also consider environment variables, such as adding environment 
config as part of cache key element?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to