This is an automated email from the ASF dual-hosted git repository. dchen pushed a commit to branch elasticity-checkpoint-lastProcessedOffsets in repository https://gitbox.apache.org/repos/asf/samza.git
commit 2cda1742469b5dcb0a3a623544875d59cf6661e1 Author: Manasa <[email protected]> AuthorDate: Fri Apr 15 16:52:45 2022 -0700 SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled --- .../java/org/apache/samza/config/JobConfig.java | 9 + .../samza/elasticity/ElasticTaskNameParts.java | 80 ++++ .../apache/samza/elasticity/ElasticityUtils.java | 486 +++++++++++++++++++++ .../apache/samza/checkpoint/OffsetManager.scala | 34 +- .../org/apache/samza/config/TestJobConfig.java | 9 + .../samza/elasticity/TestElasticityUtils.java | 363 +++++++++++++++ 6 files changed, 972 insertions(+), 9 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java index 638474715..b637325a6 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java @@ -177,6 +177,11 @@ public class JobConfig extends MapConfig { public static final String CONTAINER_HEARTBEAT_MONITOR_ENABLED = "job.container.heartbeat.monitor.enabled"; private static final boolean CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT = true; + // if true, use checkpoints from previous deploys where elasticity was enabled + // set this to true if rolling back from elasticity to before elasticity. + public static final String JOB_ELASTICITY_CHECKPOINTS_ENABLED = "job.elasticity.checkpoints.enabled"; + public static final boolean DEFAULT_JOB_ELASTICITY_CHECKPOINTS_ENABLED = false; + // Enabled elasticity for the job // number of (elastic) tasks in the job will be old task count X elasticity factor @@ -479,6 +484,10 @@ public class JobConfig extends MapConfig { return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED, CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT); } + public boolean getElasticityCheckpointEnabled() { + return getBoolean(JOB_ELASTICITY_CHECKPOINTS_ENABLED, DEFAULT_JOB_ELASTICITY_CHECKPOINTS_ENABLED); + } + public boolean getElasticityEnabled() { return getElasticityFactor() > 1; } diff --git a/samza-core/src/main/java/org/apache/samza/elasticity/ElasticTaskNameParts.java b/samza-core/src/main/java/org/apache/samza/elasticity/ElasticTaskNameParts.java new file mode 100644 index 000000000..f9268d9be --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/elasticity/ElasticTaskNameParts.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.elasticity; + +public class ElasticTaskNameParts { + + public static final int DEFAULT_KEY_BUCKET = 0; + public static final int DEFAULT_ELASTICITY_FACTOR = 1; + public static final int INVALID_PARTITION = -1; + + public final String system; + public final String stream; + public final int partition; + public final int keyBucket; + public final int elasticityFactor; + + public ElasticTaskNameParts(int partition) { + this(partition, DEFAULT_KEY_BUCKET, DEFAULT_ELASTICITY_FACTOR); + } + + public ElasticTaskNameParts(int partition, int keyBucket, int elasticityFactor) { + this("", "", partition, keyBucket, elasticityFactor); + } + + public ElasticTaskNameParts(String system, String stream, int partition) { + this(system, stream, partition, DEFAULT_KEY_BUCKET, DEFAULT_ELASTICITY_FACTOR); + } + + public ElasticTaskNameParts(String system, String stream, int partition, int keyBucket, int elasticityFactor) { + this.system = system; + this.stream = stream; + this.partition = partition; + this.keyBucket = keyBucket; + this.elasticityFactor = elasticityFactor; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ElasticTaskNameParts)) return false; + + ElasticTaskNameParts that = (ElasticTaskNameParts) o; + + if (!(this.system.equals(that.system)) + || !(this.stream.equals(that.stream)) + || (this.partition != that.partition) + || (this.keyBucket != that.keyBucket) + || (this.elasticityFactor != that.elasticityFactor)) { + return false; + } + return true; + } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + system.hashCode(); + result = prime * result + stream.hashCode(); + result = prime * result + partition; + result = prime * result + keyBucket; + result = prime * result + elasticityFactor; + return result; + } +} diff --git a/samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java b/samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java new file mode 100644 index 000000000..990702cd9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.elasticity; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.checkpoint.Checkpoint; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStreamPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Class with util methods to be used for checkpoint computation when elasticity is enabled + * Elasticity is supported only for tasks created by either + * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or + * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper + */ +public class ElasticityUtils { + private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class); + + // GroupByPartition tasks have names like Partition 0_1_2 + // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor + // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT} + static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)"; + static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)"; + static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition "; + + //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2" + // where 2 is the elasticity factor + // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString} + static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)"; + static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]"; + static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition "; + + /** + * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks + * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2 + * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2 + * Both tasks have names ending with _%d where %d is the elasticity factor + * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task + * @return + * for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name + * for other tasks returns 1 which is the default elasticity factor + */ + static int getElasticityFactorFromTaskName(TaskName taskName) { + return getTaskNameParts(taskName).elasticityFactor; + } + + /** + * checks if the given taskname is of a GroupByPartition task + * @param taskName of any task + * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise + */ + static boolean isGroupByPartitionTask(TaskName taskName) { + return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX); + } + + /** + * checks if the given taskname is of a GroupBySystemStreamPartition task + * @param taskName of any task + * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise + */ + static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) { + return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX); + } + + /** + * checks if given taskName is elastic aka created with an elasticity factor > 1 + * @param taskName of any task + * @return true for following, false otherwise + * for task created by GroupByPartition, taskName has format "Partition 0_1_2" + * for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2" + */ + static boolean isTaskNameElastic(TaskName taskName) { + if (isGroupByPartitionTask(taskName)) { + Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX); + Matcher m = p.matcher(taskName.getTaskName()); + return m.find(); + } else if (isGroupBySystemStreamPartitionTask(taskName)) { + Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX); + Matcher m = p.matcher(taskName.getTaskName()); + return m.find(); + } + return false; + } + + /** + * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor + * @param taskName any taskName + * @return ElasticTaskNameParts object containing system, stream, partition, keyBucket and elasticityFactor + * for GroupByPartition task: + * taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity) + * system and stream are empty "" strings and partition is the input partition, + * without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values) + * with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above) + * for GroupBySystemStreamPartition task: + * taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or + * "SystemStreamPartition [systemA, streamB, 0]" (without elasticity) + * system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above) + * without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values) + * with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above) + * for tasks created with other SSP groupers: + * default ElasticTaskNameParts is returned which has empty system, stream, + * -1 for partition and 0 for keyBucket and 1 for elasticity factor + */ + static ElasticTaskNameParts getTaskNameParts(TaskName taskName) { + if (isGroupByPartitionTask(taskName)) { + return getTaskNameParts_GroupByPartition(taskName); + } else if (isGroupBySystemStreamPartitionTask(taskName)) { + return getTaskNameParts_GroupBySSP(taskName); + } + log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. " + + "Elasticity is not supported for this taskName. " + + "Returning default ElasticTaskNameParts which has default keyBucket 0," + + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName()); + return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION); + } + + /** + * see doc for getTaskNameParts above + */ + static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName taskName) { + String taskNameStr = taskName.getTaskName(); + log.info("GetTaskNameParts for taskName {}", taskNameStr); + Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX); + Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX); + + Matcher matcher = elasticTaskPattern.matcher(taskNameStr); + if (matcher.find()) { + return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)), + Integer.valueOf(matcher.group(2)), + Integer.valueOf(matcher.group(3))); + } + matcher = nonElasticTaskPattern.matcher(taskNameStr); + if (matcher.find()) { + return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1))); + } + log.error("Could not extract partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr); + throw new IllegalArgumentException("TaskName format incompatible"); + } + + /** + * see doc for getTaskNameParts above + */ + static ElasticTaskNameParts getTaskNameParts_GroupBySSP(TaskName taskName) { + String taskNameStr = taskName.getTaskName(); + log.info("GetTaskNameParts for taskName {}", taskNameStr); + Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX); + Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_SSP_REGEX); + + Matcher matcher = elasticTaskPattern.matcher(taskNameStr); + if (matcher.find()) { + return new ElasticTaskNameParts(matcher.group(1), + matcher.group(2), + Integer.valueOf(matcher.group(3)), + Integer.valueOf(matcher.group(4)), + Integer.valueOf(matcher.group(5))); + } + matcher = nonElasticTaskPattern.matcher(taskNameStr); + if (matcher.find()) { + return new ElasticTaskNameParts(matcher.group(1), + matcher.group(2), + Integer.valueOf(matcher.group(3))); + } + log.warn("Could not extract system, stream, partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr); + throw new IllegalArgumentException("TaskName format incompatible"); + } + + /** + * Without elasticity, a task consumes an entire (full) SSP = [System, stream, partition]. + * With elasticity, a task consumes a portion of the SSP_withKeyBucket = [system, stream, partition, keyBucket] + * where 0 <= keyBucket < elasticityFactor and contains a subset of the IncomingMessageEnvelope(IME) from the full SSP + * Given two tasks currentTask and otherTask, the task otherTask is called ancestor of currentTask if the following is true + * all IME consumed by currentTask will be consumed by otherTask when elasticityFactor decreases or stays same + * For example: + * case 1: elasticityFactor 2 to 1 + * otherTask = Partition 0 consuming all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1 + * currentTask1 = Partition 0_0_2 consumes IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 2 + * currentTask2 = Partition 0_1_2 consumes IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor = 2 + * SSP = SSP_withKeyBucket0 + SSP_withKeyBucket1. Thus, Partition 0 is ancestor of Partition 0_0_2 and Partition 0_1_2 + * case 2: elasticityFactor 2 to 2 - no change + * Partition 0_0_2 is an ancestor of itself since the input SSP_withKeyBucket0 doesnt change + * similarly Partition 0_1_2 is an ancestor of itself. This applies to all elasticityFactors + * case 3: elasticityFactor 4 to 2 + * otherTask = Partition 0_0_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0] when elasticityFactor=2 + * currentTask1 = Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4 + * currentTask2 = Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4 + * From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope} + * we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor; + * Thus, SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01. + * Thus, Partition 0_0_2 is ancestor of Partition 0_0_4 and Partition 0_2_4 + * Similarly, Partition 0_1_2 is ancestor of Partition 0_1_4 and Partition 0_3_4 + * And transitively, Partition 0 is ancestor of Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4 + * + * This applies to tasks created by GroupByPartition and GroupBySystemStreamPartition SSPGroupers. + * aka this applies if both currentTask and otherTask are created by GroupByPartition or both are created by GroupBySystemStreamPartition + * If either currentTask and/or otherTask were created by other SSPGroupers then false is returned. + * @param currentTask + * @param otherTask + * @return true if otherTask is ancestor of currentTask, false otherwise + */ + static boolean isOtherTaskAncestorOfCurrentTask(TaskName currentTask, TaskName otherTask) { + log.info("isOtherTaskAncestorOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask); + if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask)) + || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) { + return false; + } + + ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask); + ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask); + + if (!otherTaskNameParts.system.equals(currentTaskNameParts.system) + || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream) + || otherTaskNameParts.partition != currentTaskNameParts.partition + || otherTaskNameParts.elasticityFactor > currentTaskNameParts.elasticityFactor) { + return false; + } + + return (currentTaskNameParts.keyBucket % otherTaskNameParts.elasticityFactor) == otherTaskNameParts.keyBucket; + } + + /** + * See javadoc for isOtherTaskAncestorOfCurrentTask above + * Given currentTask and otherTask, + * if currentTask == otherTask, then its not a descendant. (unlike ancestor) + * else, if isOtherTaskAncestorOfCurrentTask(otherTask, currentTask) then otherTask is descendant of currentTask + * @param currentTask + * @param otherTask + * @return + */ + static boolean isOtherTaskDescendantOfCurrentTask(TaskName currentTask, TaskName otherTask) { + log.info("isOtherTaskDescendantOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask); + if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask)) + || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) { + return false; + } + + ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask); + ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask); + + if (!otherTaskNameParts.system.equals(currentTaskNameParts.system) + || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream) + || otherTaskNameParts.partition != currentTaskNameParts.partition + || otherTaskNameParts.elasticityFactor <= currentTaskNameParts.elasticityFactor) { + return false; + } + + return (otherTaskNameParts.keyBucket % currentTaskNameParts.elasticityFactor) == currentTaskNameParts.keyBucket; + } + + /** + * For a given taskName and a map of task names to checkpoints, returns the taskName's ancestor and descendants checkpoints + * All ancestor checkpoints are put into a set + * Descendant checkpoins are put into a map of elasticityFactor to descendant checkpoint where the elastictyFactor is of the descendant. + * For example, given taskName Partition 0_0_2 and checkpoint Map (Partition 0->C1, Partition 0_0_4-> C2, Partition 0_1_4 -> C3, Partition 0_2_4 ->C4) + * the return value is AncestorSet = <C1> and descendantMap = (4 -> <C2, C4>) + * See javadoc of isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask for definition of ancestor and descendant + * @param taskName name of the task + * @param checkpointMap map from taskName to checkpoint + * @return Pair of AncestorCheckpoint set and Descendant Checkpoint Map + */ + static Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> getAncestorAndDescendantCheckpoints( + TaskName taskName, Map<TaskName, Checkpoint> checkpointMap) { + Set<Checkpoint> ancestorCheckpoints = new HashSet<>(); + Map<Integer, Set<Checkpoint>> descendantCheckpoints = new HashMap<>(); + log.info("starting to parse the checkpoint map to find ancestors and descendants for taskName {}", taskName.getTaskName()); + checkpointMap.keySet().forEach(otherTaskName -> { + Checkpoint otherTaskCheckpoint = checkpointMap.get(otherTaskName); + if (isOtherTaskAncestorOfCurrentTask(taskName, otherTaskName)) { + log.info("current task name is {} and other task name is {} and other task is ancestor", taskName, otherTaskName); + ancestorCheckpoints.add(otherTaskCheckpoint); + } + if (isOtherTaskDescendantOfCurrentTask(taskName, otherTaskName)) { + log.info("current task name is {} and other task name is {} and other task is descendant", taskName, otherTaskName); + int otherEF = getElasticityFactorFromTaskName(otherTaskName); + if (!descendantCheckpoints.containsKey(otherEF)) { + descendantCheckpoints.put(otherEF, new HashSet<>()); + } + descendantCheckpoints.get(otherEF).add(otherTaskCheckpoint); + } + }); + log.info("done computing all ancestors and descendants of {}", taskName); + return new ImmutablePair<>(ancestorCheckpoints, descendantCheckpoints); + } + + /** + * Given a checkpoint with offset map from SystemStreamPartition to offset, returns the offset for the desired ssp + * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered. + * A checkpoint belongs to one task and a task would consume either the full SSP (aka no keyBucket) + * or consume exactly one of the keyBuckets of an SSP. Hence there will be at most one entry for an SSP in a checkpoint + * @param checkpoint Checkpoint containing SSP -> offset + * @param ssp SystemStreamPartition for which an offset needs to be fetched + * @return offset for the ssp in the Checkpoint or null if doesnt exist. + */ + static String getOffsetForSSPInCheckpoint(Checkpoint checkpoint, SystemStreamPartition ssp) { + String checkpointStr = checkpoint.getOffsets().entrySet().stream() + .map(k -> k.getKey() + " : " + k.getValue()) + .collect(Collectors.joining(", ", "{", "}")); + log.info("for ssp {}, in checkpoint {}", ssp, checkpointStr); + + Optional<String> offsetFound = checkpoint.getOffsets().entrySet() + .stream() + .filter(entry -> entry.getKey().getSystemStream().equals(ssp.getSystemStream()) && entry.getKey() + .getPartition() + .equals(ssp.getPartition())) + .map(Map.Entry::getValue) + .findFirst(); + if (offsetFound.isPresent()) { + return offsetFound.get(); + } + log.warn("Could not find offset for ssp {} in checkpoint {}. returning null string as offset", ssp, checkpoint); + return null; + } + + /** + * Given a set of checkpoints, find the max aka largest offset for an ssp + * Largest is determined by the SystemAdmin.offsetCompartor of the ssp's system. + * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered. + * @param checkpointSet set of checkpoints + * @param ssp for which largest offset is needed + * @param systemAdmin of the ssp.getSystem() + * @return offset - string if one exists else null + */ + static String getMaxOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet, + SystemStreamPartition ssp, SystemAdmin systemAdmin) { + return checkpointSet.stream() + .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp)) + .filter(Objects::nonNull) + .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first + .findFirst().orElse(null); + } + + /** + * Given a set of checkpoints, find the min aka smallest offset for an ssp + * Smallest is determined by the SystemAdmin.offsetCompartor of the ssp's system. + * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered. + * @param checkpointSet set of checkpoints + * @param ssp for which largest offset is needed + * @param systemAdmin of the ssp.getSystem() + * @return offset - string if one exists else null + */ + static String getMinOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet, + SystemStreamPartition ssp, SystemAdmin systemAdmin) { + return checkpointSet.stream() + .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp)) + .filter(Objects::nonNull) + .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset1, offset2)) //confirm ascending sort - aka smallest offset first + .findFirst().orElse(null); + } + + /** + * Prereq: See javadoc for isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask to fully understand ancestor and descendant notion + * Briefly, Given tasks - Partition 0, Partition 0_0_2, Partition 0_1_2 and Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4 + * (recall Partition 0_1_2 means reads input partition 0, keyBucket 1 and elasticityFactor 2) + * For task Partition 0_0_2: ancestors = [Partition 0, Partition 0_0_2] and descendants = [Partition 0_0_4, Partition 0_2_4] + * + * If a task has no descendants, then we just need to pick the largest offset among all the ancestors to get the last processed offset. + * for example above, if Partition 0_0_2 only had ancestors and no descendants, taking largest offset among Partition 0 and 0_0_2 gives last proc offset. + * + * With descendants, a little care is needed. there could be descendants with different elasticity factors. + * given one elasticity factor, each the descendant within the elasticity factor consumes a sub-portion (aka keyBucket) of the task. + * hence, to avoid data loss, we need to pick the lowest offset across descendants of the same elasticity factor. + * Across elasticity factors, largest works just like in ancestor + * + * Taking a concrete example + * From {@link org.apache.samza.system.IncomingMessageEnvelope} (IME) + * Partition 0 consunmig all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1 + * Partition 0_1_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor=2 + * Partition 0_0_2 consuming all IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor=2 + * Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4 + * Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4 + * From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope} + * we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor; + * Thus, + * SSP = SSP_withKeyBucket0 + SSP_withKeyBucket1. + * SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01. + * If the checkpoint map has + * Partition 0: (SSP : 1), Partition 0_0_2: (SSP0 : 2), Partition 0_1_2: (SSP1 : 3), Partition 0_0_4: (SSP0 : 4), Partition 0_2_4: (SSP1 : 6) + * looking at these map and knowing that offsets are monotonically increasing, it is clear that last deploy was with elasticity factor = 4 + * to get checkpoint for Partition 0_0_2, we need to consider last deploy's offsets. + * picking 6 (offset for Partition 0_2_4) means that 0_0_2 will start proc from 6 but offset 5 was never processed. + * hence we need to take min of offsets within an elasticity factor. + * + * Given checkpoints for all the tasks in the checkpoint stream, + * computing the last proc offset for an ssp checkpoint for a task, + * the following needs to be met. + * 1. Ancestors: we need to take largest offset among ancestors for an ssp + * 2. Descendants: + * a. group descendants by their elasticityFactor. + * b. among descendants of the same elasticityFactor, take the smallest offset for an ssp + * c. once step b is done, we have (elasticityFactor : smallest-offset-for-ssp) set, pick the largest in this set + * 3. Pick the larger among the offsets received from step 1 (for ancestors) and step 2 (for descendants) + * + * @param taskName + * @param taskSSPSet + * @param checkpointMap + * @param systemAdmins + * @return + */ + public static Map<SystemStreamPartition, String> computeLastProcessedOffsetsFromCheckpointMap( + TaskName taskName, + Set<SystemStreamPartition> taskSSPSet, + Map<TaskName, Checkpoint> checkpointMap, + SystemAdmins systemAdmins) { + Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> acnestorsAndDescendantsFound = + getAncestorAndDescendantCheckpoints(taskName, checkpointMap); + Set<Checkpoint> ancestorCheckpoints = acnestorsAndDescendantsFound.getLeft(); + Map<Integer, Set<Checkpoint>> descendantCheckpoints = acnestorsAndDescendantsFound.getRight(); + + Map<SystemStreamPartition, String> taskSSPOffsets = new HashMap<>(); + + taskSSPSet.forEach(ssp_withKeyBucket -> { + log.info("for taskName {} and ssp of the task {}, finding its last proc offset", taskName, ssp_withKeyBucket); + + SystemStreamPartition ssp = new SystemStreamPartition(ssp_withKeyBucket.getSystemStream(), + ssp_withKeyBucket.getPartition()); + + SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem()); + + String currentLastOffsetForSSP = null; + + String ancestorLastOffsetForSSP = getMaxOffsetForSSPInCheckpointSet(ancestorCheckpoints, ssp, systemAdmin); + + log.info("for taskName {} and ssp {} got lastoffset from ancestors as {}", + taskName, ssp_withKeyBucket, ancestorLastOffsetForSSP); + + String descendantLastOffsetForSSP = descendantCheckpoints.entrySet().stream() + .map(entry -> getMinOffsetForSSPInCheckpointSet(entry.getValue(), ssp, systemAdmin)) // at each ef level, find min offset + .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first + .findFirst().orElse(null); + + log.info("for taskName {} and ssp {} got lastoffset from ancestors as {}", + taskName, ssp_withKeyBucket, descendantLastOffsetForSSP); + + Integer offsetComparison = systemAdmin.offsetComparator(ancestorLastOffsetForSSP, descendantLastOffsetForSSP); + if (offsetComparison != null && offsetComparison > 0) { // means ancestorLastOffsetForSSP > descendantLastOffsetForSSP + currentLastOffsetForSSP = ancestorLastOffsetForSSP; + } else { + currentLastOffsetForSSP = descendantLastOffsetForSSP; + } + log.info("for taskName {} and ssp {} got lastoffset as {}", taskName, ssp_withKeyBucket, currentLastOffsetForSSP); + taskSSPOffsets.put(ssp_withKeyBucket, currentLastOffsetForSSP); + }); + + String checkpointStr = taskSSPOffsets.entrySet().stream() + .map(k -> k.getKey() + " : " + k.getValue()) + .collect(Collectors.joining(", ", "{", "}")); + log.info("for taskName {}, returning checkpoint as {}", taskName, checkpointStr); + return taskSSPOffsets; + } + + public static boolean wasElasticityEnabled(Map<TaskName, Checkpoint> checkpointMap) { + return checkpointMap.keySet().stream() + .filter(ElasticityUtils::isTaskNameElastic) // true if the taskName has elasticityFactor in it + .findFirst().isPresent(); + } +} diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index 7491eaaaf..7a12625f0 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -22,12 +22,13 @@ package org.apache.samza.checkpoint import java.util import java.util.HashMap import java.util.concurrent.ConcurrentHashMap - import org.apache.commons.lang3.StringUtils import org.apache.samza.SamzaException import org.apache.samza.annotation.InterfaceStability -import org.apache.samza.config.{Config, StreamConfig, SystemConfig} +import org.apache.samza.checkpoint.OffsetManager.info +import org.apache.samza.config.{Config, JobConfig, StreamConfig, SystemConfig} import org.apache.samza.container.TaskName +import org.apache.samza.elasticity.ElasticityUtils import org.apache.samza.startpoint.{Startpoint, StartpointManager} import org.apache.samza.system.SystemStreamMetadata.OffsetType import org.apache.samza.system._ @@ -105,7 +106,10 @@ object OffsetManager extends Logging { // Build OffsetSetting so we can create a map for OffsetManager. (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset)) }.toMap - new OffsetManager(offsetSettings, checkpointManager, startpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics) + val elasticityCheckpointsEnabled = new JobConfig(config).getElasticityCheckpointEnabled + + new OffsetManager(offsetSettings, checkpointManager, startpointManager, systemAdmins, checkpointListeners, + offsetManagerMetrics, elasticityCheckpointsEnabled) } } @@ -160,7 +164,12 @@ class OffsetManager( /** * offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition. */ - val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) extends Logging { + val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics, + + /** + * if true, checkpoints generated during elasticity deploys will be used for last processed offsets computation at container start + */ + val elasticityCheckpointsEnabled: Boolean = false) extends Logging { /** * Last offsets processed for each SystemStreamPartition. @@ -461,12 +470,19 @@ class OffsetManager( val checkpoint = checkpointManager.readLastCheckpoint(taskName) - if (checkpoint != null) { - Map(taskName -> checkpoint.getOffsets.asScala.toMap) - } else { - info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName) - Map(taskName -> Map()) + val checkpointMap = checkpointManager.readAllCheckpoints() + if (!elasticityCheckpointsEnabled || !ElasticityUtils.wasElasticityEnabled(checkpointMap)) { + if (checkpoint != null) { + return Map(taskName -> checkpoint.getOffsets.asScala.toMap) + } else { + info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName) + return Map(taskName -> Map()) + } } + info("Elasticity checkpoints is enabled and there was elasticity enabled in one of the previous deploys." + + "Last processed offsets computation at container start will use elasticity checkpoints if available.") + Map(taskName -> ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(taskName, + systemStreamPartitions.get(taskName).get.asJava, checkpointMap, systemAdmins).asScala) } /** diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java index 4d171662c..9cf70ff07 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java @@ -600,6 +600,15 @@ public class TestJobConfig { "false"))).getContainerHeartbeatMonitorEnabled()); } + @Test + public void testGetElasticityCheckpointEnabled() { + assertFalse(new JobConfig(new MapConfig()).getElasticityCheckpointEnabled()); + assertTrue(new JobConfig(new MapConfig( + ImmutableMap.of(JobConfig.JOB_ELASTICITY_CHECKPOINTS_ENABLED, "true"))).getElasticityCheckpointEnabled()); + assertFalse(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_CHECKPOINTS_ENABLED, + "false"))).getElasticityCheckpointEnabled()); + } + @Test public void testGetElastictyEnabled() { // greater than 1 means enabled diff --git a/samza-core/src/test/java/org/apache/samza/elasticity/TestElasticityUtils.java b/samza-core/src/test/java/org/apache/samza/elasticity/TestElasticityUtils.java new file mode 100644 index 000000000..7590c9a3a --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/elasticity/TestElasticityUtils.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.elasticity; + +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.Partition; +import org.apache.samza.checkpoint.Checkpoint; +import org.apache.samza.checkpoint.CheckpointId; +import org.apache.samza.checkpoint.CheckpointV2; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + + +// #TODO: going to make this entire class parametrized. +public class TestElasticityUtils { + private static final TaskName TASKNAME_GROUP_BY_PARTITION = new TaskName("Partition 0"); + private static final TaskName ELASTIC_TASKNAME_GROUP_BY_PARTITION = new TaskName("Partition 0_1_2"); + private static final TaskName TASKNAME_GROUP_BY_SSP = new TaskName("SystemStreamPartition [systemA, streamB, 0]"); + private static final TaskName ELASTIC_TASKNAME_GROUP_BY_SSP = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_2"); + + @Test + public void testComputeLastProcessedOffsetsFromCheckpointMap() { + // Setup : + // there is one ssp = SystemStreamPartition [systemA, streamB, partition(0)] consumed by the job + // Note: Partition 0_1_2 means task consumes keyBucket 1 of partition 0 and has elasticityFactor 2. + // Before elasticity, job has one task with name "Partition 0" + // with elasticity factor 2, job has 2 tasks with names "Partition 0_0_2" and "Partition 0_1_2" + // Partition 0_0_2 consumes SSP[systemA, stream B, partition(0), keyBucket(0)] + // Partition 0_1_2 consumes SSP[systemA, stream B, partition(0), keyBucket(1)] + // with elasticity factor 4, job has 4 tasks with names "Partition 0_0_4", "Partition 0_1_4", "Partition 0_2_4" and "Partition 0_3_4" + // Partition 0_0_4 consumes SSP[systemA, stream B, partition(0), keyBucket(0)] + // Partition 0_1_4 consumes SSP[systemA, stream B, partition(0), keyBucket(1)] + // Partition 0_2_4 consumes SSP[systemA, stream B, partition(0), keyBucket(2)] + // Partition 0_3_4 consumes SSP[systemA, stream B, partition(0), keyBucket(3)] + + // + // From the definition of keyBucket computation using elasticity factor in + // {@link IncomingMessageEnvelope.getSystemStresamPartition(elasticityFactor) as + // keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor + // messages processed by 0_0_4 and 0_2_4 will be the same as those processed by 0_0_2 + // messages processed by 0_1_4 and 0_3_4 will be the same as those processed by 0_1_2 + // messages processed by 0_0_2 and 0_1_2 will be the same as those processed by Partition 0 itself + + TaskName taskName = new TaskName("Partition 0_0_2"); + Map<TaskName, Checkpoint> checkpointMap = new HashMap<>(); + SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0)); + SystemStreamPartition ssp0 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 0); + SystemStreamPartition ssp2 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 2); + + + SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class); + // offsets ordering 1 < 2 < 3 < 4 + Mockito.when(mockSystemAdmin.offsetComparator("1", "2")).thenReturn(-1); + Mockito.when(mockSystemAdmin.offsetComparator("2", "1")).thenReturn(1); + Mockito.when(mockSystemAdmin.offsetComparator("1", "3")).thenReturn(-1); + Mockito.when(mockSystemAdmin.offsetComparator("3", "1")).thenReturn(1); + Mockito.when(mockSystemAdmin.offsetComparator("1", "4")).thenReturn(-1); + Mockito.when(mockSystemAdmin.offsetComparator("4", "1")).thenReturn(1); + Mockito.when(mockSystemAdmin.offsetComparator("2", "3")).thenReturn(-1); + Mockito.when(mockSystemAdmin.offsetComparator("3", "2")).thenReturn(1); + Mockito.when(mockSystemAdmin.offsetComparator("2", "4")).thenReturn(-1); + Mockito.when(mockSystemAdmin.offsetComparator("4", "2")).thenReturn(1); + Mockito.when(mockSystemAdmin.offsetComparator("3", "4")).thenReturn(-1); + Mockito.when(mockSystemAdmin.offsetComparator("4", "3")).thenReturn(1); + + SystemAdmins mockSystemAdmins = Mockito.mock(SystemAdmins.class); + Mockito.when(mockSystemAdmins.getSystemAdmin(ssp0.getSystem())).thenReturn(mockSystemAdmin); + + // case 1: for task Partition 0_0_2: last deploy was with ef = 2 itself. + // hence "Partition 0_0_2" has the largest offset and that should be used for computing checkpoint for 0_0_2 now also + checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "1")); + checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "4")); + checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "2")); + checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "3")); + Map<SystemStreamPartition, String> result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap( + taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins); + Assert.assertEquals("4", result.get(ssp0)); + + // case 2: for task Partition 0_0_2: last deploy was with ef =1 + // hence "Partition 0" has the largest offset. Computing checkpint for 0_0_2 should use this largest offset + checkpointMap = new HashMap<>(); + checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "4")); + checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "1")); + checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "3")); + checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "2")); + + + result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap( + taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins); + Assert.assertEquals("4", result.get(ssp0)); + + + // case 3: for task partition 0_0_2: last deploy was with ef = 4 + // hence checkpoints of Partition 0_0_4 and Partition 0_3_4 are relevant. + // since messages from both end up in 0_0_2 with ef=2, need to take min of their checkpointed offsets + + checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "1")); + checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "2")); + checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "3")); + checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "4")); + result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap( + taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins); + Assert.assertEquals("3", result.get(ssp0)); + } + + @Test + public void testTaskIsGroupByPartitionOrGroupBySSP() { + String msgPartition = "GroupByPartition task should start with Partition"; + String msgSsp = "GroupBySystemStreamPartition task should start with SystemStreamPartition"; + + Assert.assertTrue(msgPartition, ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_PARTITION)); + Assert.assertFalse(msgPartition, ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_PARTITION)); + + Assert.assertTrue(msgPartition, ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_PARTITION)); + Assert.assertFalse(msgPartition, ElasticityUtils.isGroupBySystemStreamPartitionTask( + ELASTIC_TASKNAME_GROUP_BY_PARTITION)); + + Assert.assertTrue(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_SSP)); + Assert.assertFalse(msgSsp, ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_SSP)); + + Assert.assertTrue(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP)); + Assert.assertFalse(msgSsp, ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP)); + + TaskName taskName = new TaskName("FooBar"); + Assert.assertFalse(msgPartition, ElasticityUtils.isGroupByPartitionTask(taskName)); + Assert.assertFalse(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(taskName)); + } + + @Test + public void testIsTaskNameElastic() { + Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_SSP)); + Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_SSP)); + Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_PARTITION)); + Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_PARTITION)); + } + + @Test + public void testGetElasticTaskNameParts() { + ElasticTaskNameParts taskNameParts = ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_PARTITION); + Assert.assertEquals(taskNameParts.partition, 0); + Assert.assertEquals(taskNameParts.keyBucket, ElasticTaskNameParts.DEFAULT_KEY_BUCKET); + Assert.assertEquals(taskNameParts.elasticityFactor, ElasticTaskNameParts.DEFAULT_ELASTICITY_FACTOR); + + taskNameParts = ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_PARTITION); + Assert.assertEquals(taskNameParts.partition, 0); + Assert.assertEquals(taskNameParts.keyBucket, 1); + Assert.assertEquals(taskNameParts.elasticityFactor, 2); + + taskNameParts = ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_SSP); + Assert.assertEquals(taskNameParts.system, "systemA"); + Assert.assertEquals(taskNameParts.stream, "streamB"); + Assert.assertEquals(taskNameParts.partition, 0); + Assert.assertEquals(taskNameParts.keyBucket, ElasticTaskNameParts.DEFAULT_KEY_BUCKET); + Assert.assertEquals(taskNameParts.elasticityFactor, ElasticTaskNameParts.DEFAULT_ELASTICITY_FACTOR); + + taskNameParts = ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_SSP); + Assert.assertEquals(taskNameParts.system, "systemA"); + Assert.assertEquals(taskNameParts.stream, "streamB"); + Assert.assertEquals(taskNameParts.partition, 0); + Assert.assertEquals(taskNameParts.keyBucket, 1); + Assert.assertEquals(taskNameParts.elasticityFactor, 2); + + taskNameParts = ElasticityUtils.getTaskNameParts(new TaskName("FooBar")); + Assert.assertEquals(taskNameParts.partition, ElasticTaskNameParts.INVALID_PARTITION); + } + + @Test + public void testIsOtherTaskAncestorDescendantOfCurrentTask() { + TaskName task0 = new TaskName("Partition 0"); + TaskName task1 = new TaskName("Partition 1"); + TaskName task002 = new TaskName("Partition 0_0_2"); + TaskName task012 = new TaskName("Partition 0_1_2"); + TaskName task004 = new TaskName("Partition 0_0_4"); + TaskName task014 = new TaskName("Partition 0_1_4"); + TaskName task024 = new TaskName("Partition 0_2_4"); + TaskName task034 = new TaskName("Partition 0_3_4"); + + TaskName sspTask0 = new TaskName("SystemStreamPartition [systemA, streamB, 0]"); + TaskName sspTask002 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 0]_2"); + TaskName sspTask012 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_2"); + TaskName sspTask004 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 0]_4"); + TaskName sspTask014 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_4"); + TaskName sspTask024 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 2]_4"); + TaskName sspTask034 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 3]_4"); + + // Partition 0 is ancestor of all tasks Partition 0_0_2, 0_1_2, 0_0_4, 0_1_4, 0_2_4, 0_3_4 and itself + // and all these tasks are descendants of Partition 0 (except itself) + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, task0)); + Assert.assertFalse(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, task1)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, task0)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task012, task0)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, task0)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task014, task0)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, task0)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task034, task0)); + + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task002)); + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task012)); + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task004)); + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task014)); + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task024)); + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task034)); + + // Partition 0_0_2 is ancestor of tasks Partition 0_0_4 and 0_2_4 and itself + // these tasks are descendants of 0_0_2 + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, task002)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, task002)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, task002)); + + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, task004)); + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, task024)); + + // "SystemStreamPartition [systemA, streamB, 0] + // is ancestor of all tasks "SystemStreamPartition [systemA, streamB, 0, 0]_2, [systemA, streamB, 0, 1]_2 and the rest incl itself + // and all these tasks are descendants of Partition 0 (except itself) + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask0, sspTask0)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, sspTask0)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask012, sspTask0)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, sspTask0)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask014, sspTask0)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, sspTask0)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask034, sspTask0)); + + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask002)); + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask012)); + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask004)); + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask014)); + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask024)); + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask034)); + + // SystemStreamPartition [systemA, streamB, 0, 0]_2 is ancestor of + // tasks SystemStreamPartition [systemA, streamB, 0, 0]_4, SystemStreamPartition [systemA, streamB, 0, 2]_4 and itself + // similarly, these tasks are descendants of SystemStreamPartition [systemA, streamB, 0, 0]_2 + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, sspTask002)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, sspTask002)); + Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, sspTask002)); + + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002, sspTask004)); + Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002, sspTask024)); + } + + @Test + public void testGetAncestorAndDescendantCheckpoints() { + TaskName taskName = new TaskName("Partition 0_0_2"); + Map<TaskName, Checkpoint> checkpointMap = new HashMap<>(); + SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0)); + Checkpoint ansCheckpoint1 = buildCheckpointV2(ssp, "1"); + Checkpoint ansCheckpoint2 = buildCheckpointV2(ssp, "2"); + Checkpoint desCheckpoint1 = buildCheckpointV2(ssp, "3"); + Checkpoint desCheckpoint2 = buildCheckpointV2(ssp, "4"); + Checkpoint unrelCheckpoint = buildCheckpointV2(ssp, "5"); + Set<Checkpoint> ansCheckpointSet = new HashSet<>(Arrays.asList(ansCheckpoint1, ansCheckpoint2)); + Set<Checkpoint> desCheckpointSet = new HashSet<>(Arrays.asList(desCheckpoint1, desCheckpoint2)); + + checkpointMap.put(new TaskName("Partition 0"), ansCheckpoint1); + checkpointMap.put(new TaskName("Partition 0_0_2"), ansCheckpoint2); + checkpointMap.put(new TaskName("Partition 0_0_4"), desCheckpoint1); + checkpointMap.put(new TaskName("Partition 0_2_4"), desCheckpoint2); + checkpointMap.put(new TaskName("Partition 0_1_4"), unrelCheckpoint); + + Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> result = + ElasticityUtils.getAncestorAndDescendantCheckpoints(taskName, checkpointMap); + Set<Checkpoint> anscestorCheckpointSet = result.getLeft(); + Set<Checkpoint> descendantCheckpointSetForEf4 = result.getRight().get(4); + + Assert.assertTrue("should contain all ancestors' checkpoints", + anscestorCheckpointSet.containsAll(ansCheckpointSet)); + Assert.assertFalse("should not contain a descendant checkpoint in anscetor list", + anscestorCheckpointSet.contains(desCheckpoint1)); + Assert.assertFalse("should not contain an unrelated checkpoint in ancestor list", + anscestorCheckpointSet.contains(unrelCheckpoint)); + + Assert.assertTrue("should contain all descendants' checkpoints", + descendantCheckpointSetForEf4.containsAll(desCheckpointSet)); + Assert.assertFalse("should not contain a anscetor checkpoint in descendant list", + descendantCheckpointSetForEf4.contains(ansCheckpoint1)); + Assert.assertFalse("should not contain an unrelated checkpoint in descendant list", + descendantCheckpointSetForEf4.contains(unrelCheckpoint)); + } + + @Test + public void testGetOffsetForSSPInCheckpoint() { + String offset1 = "1111"; + String offset2 = "2222"; + // case 1: when looking for exact ssp + SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0)); + Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1); + Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp), offset1); + + // case 2: checkpoint has ssp with key bucket but looking for the full ssp (same system stream and partition but without keybucket) + SystemStreamPartition sspWithKB = new SystemStreamPartition("systemA", "streamB", new Partition(0), 1); + checkpoint1 = buildCheckpointV2(sspWithKB, offset2); + Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp), offset2); + + // case 3: try getting offset for an ssp not present in the checkpoint -> should return null + SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new Partition(1)); + Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp2), null); + } + + @Test + public void testGetMaxMinOffsetForSSPInCheckpointSet() { + String offset1 = "1111"; + String offset2 = "2222"; + + SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0)); + Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1); + Checkpoint checkpoint2 = buildCheckpointV2(ssp, offset2); + Set<Checkpoint> checkpointSet = new HashSet<>(Arrays.asList(checkpoint1, checkpoint2)); + + SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class); + // offset 1 < offset2 + Mockito.when(mockSystemAdmin.offsetComparator(offset1, offset2)).thenReturn(-1); + Mockito.when(mockSystemAdmin.offsetComparator(offset2, offset1)).thenReturn(1); + + // case 1: when exact ssp is in checkpoint set + Assert.assertEquals(offset2, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp, mockSystemAdmin)); + Assert.assertEquals(offset1, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp, mockSystemAdmin)); + + // case 2: when looking for ssp with keyBucket 1 whereas checkpoint set only has full ssp (same system stream and partition but without keybucket) + SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, 1); + Assert.assertEquals(offset2, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, sspWithKeyBucket, mockSystemAdmin)); + Assert.assertEquals(offset1, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, sspWithKeyBucket, mockSystemAdmin)); + + + // case 3: when ssp not in checkpoint set -> should receive null for min and max offset + SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new Partition(0)); + Assert.assertEquals(null, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp2, mockSystemAdmin)); + Assert.assertEquals(null, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp2, mockSystemAdmin)); + } + + private static CheckpointV2 buildCheckpointV2(SystemStreamPartition ssp, String offset) { + return new CheckpointV2(CheckpointId.create(), ImmutableMap.of(ssp, offset), + ImmutableMap.of("backend", ImmutableMap.of("store", "10"))); + } +}
