Radeity commented on code in PR #13194: URL: https://github.com/apache/dolphinscheduler/pull/13194#discussion_r1050320279
########## dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.utils; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.databind.JsonNode; + +public class TaskCacheUtils { + + private TaskCacheUtils() { + throw new IllegalStateException("Utility class"); + } + + public static final String MERGE_TAG = "-"; + + /** + * generate cache key for task instance + * the follow message will be used to generate cache key + * 2. task version + * 3. task is cache + * 4. input VarPool, from upstream task and workflow global parameters + * @param taskInstance task instance + * @param taskExecutionContext taskExecutionContext + * @return cache key + */ + public static String generateCacheKey(TaskInstance taskInstance, TaskExecutionContext taskExecutionContext) { + List<String> keyElements = new ArrayList<>(); + keyElements.add(String.valueOf(taskInstance.getTaskCode())); + keyElements.add(String.valueOf(taskInstance.getTaskDefinitionVersion())); + keyElements.add(String.valueOf(taskInstance.getIsCache().getCode())); + keyElements.add(getTaskInputVarPoolData(taskInstance, taskExecutionContext)); + String data = StringUtils.join(keyElements, "_"); + return DigestUtils.sha256Hex(data); + } + + /** + * generate cache key for task instance which is cache execute + * this key will record which cache task instance will be copied, and cache key will be used + * tagCacheKey = sourceTaskId + "-" + cacheKey + * @param sourceTaskId source task id + * @param cacheKey cache key + * @return tagCacheKey + */ + public static String generateTagCacheKey(Integer sourceTaskId, String cacheKey) { + return sourceTaskId + MERGE_TAG + cacheKey; + } + + /** + * revert cache key tag to source task id and cache key + * @param tagCacheKey cache key + * @return Pair<Integer, String>, first is source task id, second is cache key + */ + public static Pair<Integer, String> revertCacheKey(String tagCacheKey) { + Pair<Integer, String> taskIdAndCacheKey; + if (tagCacheKey == null) { + taskIdAndCacheKey = Pair.of(-1, ""); + return taskIdAndCacheKey; + } + if (tagCacheKey.contains(MERGE_TAG)) { + String[] split = tagCacheKey.split(MERGE_TAG); + if (split.length == 2) { + taskIdAndCacheKey = Pair.of(Integer.parseInt(split[0]), split[1]); + } else { + taskIdAndCacheKey = Pair.of(-1, ""); + } + return taskIdAndCacheKey; + } else { + return Pair.of(-1, tagCacheKey); + } + } + + /** + * get hash data of task input var pool + * there are two parts of task input var pool: from upstream task and workflow global parameters + * @param taskInstance task instance + * taskExecutionContext taskExecutionContext + */ + public static String getTaskInputVarPoolData(TaskInstance taskInstance, TaskExecutionContext context) { + JsonNode taskParams = JSONUtils.parseObject(taskInstance.getTaskParams()); + + // The set of input values considered from localParams in the taskParams + Set<String> propertyInSet = JSONUtils.toList(taskParams.get("localParams").toString(), Property.class).stream() + .filter(property -> property.getDirect().equals(Direct.IN)) + .map(Property::getProp).collect(Collectors.toSet()); + + // The set of input values considered from `${var}` form task definition + propertyInSet.addAll(getScriptVarInSet(taskInstance)); Review Comment: > Dose environment config means the actual definition content about environment_code ? Yes, it is. I think it's better consider it when generate cache key in this PR for two reasons: - `EnvironmentConfig` can be directly fetched from either `TaskInstance` or `TaskExecutionContext`. - `EnvironmentConfig` is like `taskCode` which DS takes charge of their management. Rather, data from the worker or from the resource center may change, user themselves should take into consideration. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
