rmatharu commented on code in PR #1598: URL: https://github.com/apache/samza/pull/1598#discussion_r864343288
########## samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java: ########## @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.elasticity; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.checkpoint.Checkpoint; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStreamPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Class with util methods to be used for checkpoint computation when elasticity is enabled + * Elasticity is supported only for tasks created by either + * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or + * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper + */ +public class ElasticityUtils { + private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class); + + // GroupByPartition tasks have names like Partition 0_1_2 + // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor + // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT} + static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)"; + static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)"; + static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition "; + + //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2" + // where 2 is the elasticity factor + // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString} + static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)"; + static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]"; + static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition "; + + /** + * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks + * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2 + * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2 + * Both tasks have names ending with _%d where %d is the elasticity factor + * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task + * @return + * for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name + * for other tasks returns 1 which is the default elasticity factor + */ + static int getElasticityFactorFromTaskName(TaskName taskName) { + return getTaskNameParts(taskName).elasticityFactor; + } + + /** + * checks if the given taskname is of a GroupByPartition task + * @param taskName of any task + * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise + */ + static boolean isGroupByPartitionTask(TaskName taskName) { + return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX); + } + + /** + * checks if the given taskname is of a GroupBySystemStreamPartition task + * @param taskName of any task + * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise + */ + static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) { + return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX); + } + + /** + * checks if given taskName is elastic aka created with an elasticity factor > 1 + * @param taskName of any task + * @return true for following, false otherwise + * for task created by GroupByPartition, taskName has format "Partition 0_1_2" + * for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2" + */ + static boolean isTaskNameElastic(TaskName taskName) { + if (isGroupByPartitionTask(taskName)) { + Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX); + Matcher m = p.matcher(taskName.getTaskName()); + return m.find(); + } else if (isGroupBySystemStreamPartitionTask(taskName)) { + Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX); + Matcher m = p.matcher(taskName.getTaskName()); + return m.find(); + } + return false; + } + + /** + * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor + * @param taskName any taskName + * @return ElasticTaskNameParts object containing system, stream, partition, keyBucket and elasticityFactor + * for GroupByPartition task: + * taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity) + * system and stream are empty "" strings and partition is the input partition, + * without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values) + * with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above) + * for GroupBySystemStreamPartition task: + * taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or + * "SystemStreamPartition [systemA, streamB, 0]" (without elasticity) + * system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above) + * without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values) + * with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above) + * for tasks created with other SSP groupers: + * default ElasticTaskNameParts is returned which has empty system, stream, + * -1 for partition and 0 for keyBucket and 1 for elasticity factor + */ + static ElasticTaskNameParts getTaskNameParts(TaskName taskName) { + if (isGroupByPartitionTask(taskName)) { + return getTaskNameParts_GroupByPartition(taskName); + } else if (isGroupBySystemStreamPartitionTask(taskName)) { + return getTaskNameParts_GroupBySSP(taskName); + } + log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. " + + "Elasticity is not supported for this taskName. " + + "Returning default ElasticTaskNameParts which has default keyBucket 0," + + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName()); + return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION); + } + + /** + * see doc for getTaskNameParts above + */ + static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName taskName) { Review Comment: private? ########## samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java: ########## @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.elasticity; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.checkpoint.Checkpoint; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStreamPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Class with util methods to be used for checkpoint computation when elasticity is enabled + * Elasticity is supported only for tasks created by either + * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or + * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper + */ +public class ElasticityUtils { + private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class); + + // GroupByPartition tasks have names like Partition 0_1_2 + // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor + // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT} + static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)"; + static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)"; + static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition "; + + //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2" + // where 2 is the elasticity factor + // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString} + static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)"; + static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]"; + static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition "; + + /** + * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks + * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2 + * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2 + * Both tasks have names ending with _%d where %d is the elasticity factor + * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task + * @return + * for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name + * for other tasks returns 1 which is the default elasticity factor + */ + static int getElasticityFactorFromTaskName(TaskName taskName) { + return getTaskNameParts(taskName).elasticityFactor; + } + + /** + * checks if the given taskname is of a GroupByPartition task + * @param taskName of any task + * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise + */ + static boolean isGroupByPartitionTask(TaskName taskName) { + return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX); + } + + /** + * checks if the given taskname is of a GroupBySystemStreamPartition task + * @param taskName of any task + * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise + */ + static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) { + return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX); + } + + /** + * checks if given taskName is elastic aka created with an elasticity factor > 1 + * @param taskName of any task + * @return true for following, false otherwise + * for task created by GroupByPartition, taskName has format "Partition 0_1_2" + * for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2" + */ + static boolean isTaskNameElastic(TaskName taskName) { + if (isGroupByPartitionTask(taskName)) { + Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX); + Matcher m = p.matcher(taskName.getTaskName()); + return m.find(); + } else if (isGroupBySystemStreamPartitionTask(taskName)) { + Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX); + Matcher m = p.matcher(taskName.getTaskName()); + return m.find(); + } + return false; + } + + /** + * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor + * @param taskName any taskName + * @return ElasticTaskNameParts object containing system, stream, partition, keyBucket and elasticityFactor + * for GroupByPartition task: + * taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity) + * system and stream are empty "" strings and partition is the input partition, + * without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values) + * with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above) + * for GroupBySystemStreamPartition task: + * taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or + * "SystemStreamPartition [systemA, streamB, 0]" (without elasticity) + * system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above) + * without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values) + * with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above) + * for tasks created with other SSP groupers: + * default ElasticTaskNameParts is returned which has empty system, stream, + * -1 for partition and 0 for keyBucket and 1 for elasticity factor + */ + static ElasticTaskNameParts getTaskNameParts(TaskName taskName) { + if (isGroupByPartitionTask(taskName)) { + return getTaskNameParts_GroupByPartition(taskName); + } else if (isGroupBySystemStreamPartitionTask(taskName)) { + return getTaskNameParts_GroupBySSP(taskName); + } + log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. " + + "Elasticity is not supported for this taskName. " + + "Returning default ElasticTaskNameParts which has default keyBucket 0," + + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName()); + return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION); + } + + /** + * see doc for getTaskNameParts above + */ + static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName taskName) { + String taskNameStr = taskName.getTaskName(); + log.debug("GetTaskNameParts for taskName {}", taskNameStr); + Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX); + Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX); + + Matcher matcher = elasticTaskPattern.matcher(taskNameStr); + if (matcher.find()) { + return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)), + Integer.valueOf(matcher.group(2)), + Integer.valueOf(matcher.group(3))); + } + matcher = nonElasticTaskPattern.matcher(taskNameStr); + if (matcher.find()) { + return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1))); + } + log.error("Could not extract partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr); + throw new IllegalArgumentException("TaskName format incompatible"); + } + + /** + * see doc for getTaskNameParts above + */ + static ElasticTaskNameParts getTaskNameParts_GroupBySSP(TaskName taskName) { Review Comment: private? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
