xiefan46 commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r861203346


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when 
elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP 
grouper or
+ * the {@link 
org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP 
grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = 
LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the 
elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition 
(\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, 
<Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link 
SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = 
"SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition 
\\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and 
GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 
0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names 
SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or 
GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns 
elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or 
false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return 
taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix 
"SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor 
> 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 
0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format 
"SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    }
+    return false;
+  }
+
+  /**
+   * From given taskName extract the values for system, stream, partition, 
keyBucket and elasticityFactor
+   * @param taskName any taskName
+   * @return ElasticTaskNameParts object containing system, stream, partition, 
keyBucket and elasticityFactor
+   *    for GroupByPartition task:
+   *         taskNames are of the format "Partition 0_1_2" (with elasticity) 
or "Partition 0" (without elasticity)
+   *         system and stream are empty "" strings and partition is the input 
partition,
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the 
default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and 
elasticityFactor (ex 2 from above)
+   *    for GroupBySystemStreamPartition task:
+   *         taskNames are of the format "SystemStreamPartition [systemA, 
streamB, 0, 1]_2" (with elasticity) or
+   *         "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
+   *         system and stream and partition are from the name (ex system = 
systemA, steram = streamB, partition =0 above)
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the 
default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and 
elasticityFactor (ex 2 from above)
+   *   for tasks created with other SSP groupers:
+   *        default ElasticTaskNameParts is returned which has empty system, 
stream,
+   *        -1 for partition and 0 for keyBucket and 1 for elasticity factor
+   */
+  static ElasticTaskNameParts getTaskNameParts(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      return getTaskNameParts_GroupByPartition(taskName);
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      return getTaskNameParts_GroupBySSP(taskName);
+    }
+    log.warn("TaskName {} is neither GroupByPartition nor 
GroupBySystemStreamPartition task. "
+        + "Elasticity is not supported for this taskName. "
+        + "Returning default ElasticTaskNameParts which has default keyBucket 
0,"
+        + " default elasticityFactor 1 and invalid partition -1", 
taskName.getTaskName());
+    return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION);
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName 
taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = 
Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+    Pattern nonElasticTaskPattern = 
Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)),
+          Integer.valueOf(matcher.group(2)),
+          Integer.valueOf(matcher.group(3)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)));
+    }
+    log.error("Could not extract partition, keybucket and elasticity factor 
from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupBySSP(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = 
Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+    Pattern nonElasticTaskPattern = 
Pattern.compile(TASK_NAME_GROUP_BY_SSP_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)),
+          Integer.valueOf(matcher.group(4)),
+          Integer.valueOf(matcher.group(5)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)));
+    }
+    log.warn("Could not extract system, stream, partition, keybucket and 
elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * Without elasticity, a task consumes an entire (full) SSP = [System, 
stream, partition].
+   * With elasticity, a task consumes a portion of the SSP_withKeyBucket = 
[system, stream, partition, keyBucket]
+   *    where 0 <= keyBucket < elasticityFactor and contains a subset of the 
IncomingMessageEnvelope(IME) from the full SSP
+   * Given two tasks currentTask and otherTask, the task otherTask is called 
ancestor of currentTask if the following is true
+   *    all IME consumed by currentTask will be consumed by otherTask when 
elasticityFactor decreases or stays same
+   *    For example:
+   *      case 1: elasticityFactor 2 to 1
+   *            otherTask = Partition 0 consuming all IME in SSP = [systemA, 
streamB, 0] when elasticityFactor=1
+   *            currentTask1 = Partition 0_0_2 consumes IME in 
SSP_withKeyBucket0 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor 
= 2
+   *            currentTask2 = Partition 0_1_2 consumes IME in 
SSP_withKeyBucket1 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor 
= 2
+   *            SSP =  SSP_withKeyBucket0 + SSP_withKeyBucket1. Thus, 
Partition 0 is ancestor of Partition 0_0_2 and Partition 0_1_2
+   *      case 2: elasticityFactor 2 to 2 - no change
+   *            Partition 0_0_2 is an ancestor of itself since the input 
SSP_withKeyBucket0 doesnt change
+   *            similarly Partition 0_1_2 is an ancestor of itself. This 
applies to all elasticityFactors
+   *      case 3: elasticityFactor 4 to 2
+   *            otherTask = Partition 0_0_2 consuming all IME in 
SSP_withKeyBucket0 = [systemA, streamB, 0, 0] when elasticityFactor=2
+   *            currentTask1 = Partition 0_0_4 consumes IME in 
SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when 
elasticityFactor = 4
+   *            currentTask2 = Partition 0_2_4 consumes IME in 
SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when 
elasticityFactor = 4
+   *            From the computation of SSP_withkeyBucket in {@link 
org.apache.samza.system.IncomingMessageEnvelope}
+   *            we have getSystemStreamPartition(int elasticityFactor) which 
does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % 
elasticityFactor;
+   *            Thus, SSP_withKeyBucket0 = SSP_withKeyBucket00 + 
SSP_withKeyBucket01.
+   *            Thus, Partition 0_0_2 is ancestor of Partition 0_0_4 and 
Partition 0_2_4
+   *            Similarly, Partition 0_1_2 is ancestor of Partition 0_1_4 and 
Partition 0_3_4
+   *            And transitively, Partition 0 is ancestor of Partition 0_0_4, 
Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
+   *
+   * This applies to tasks created by GroupByPartition and 
GroupBySystemStreamPartition SSPGroupers.
+   * aka this applies if both currentTask and otherTask are created by 
GroupByPartition or both are created by GroupBySystemStreamPartition
+   * If either currentTask and/or otherTask were created by other SSPGroupers 
then false is returned.
+   * @param currentTask
+   * @param otherTask
+   * @return true if otherTask is ancestor of currentTask, false otherwise
+   */
+  static boolean isOtherTaskAncestorOfCurrentTask(TaskName currentTask, 
TaskName otherTask) {
+    log.info("isOtherTaskAncestorOfCurrentTask with currentTask {} and 
otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && 
isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && 
isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor > 
currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (currentTaskNameParts.keyBucket % 
otherTaskNameParts.elasticityFactor) == otherTaskNameParts.keyBucket;
+  }
+
+  /**
+   * See javadoc for isOtherTaskAncestorOfCurrentTask above
+   * Given currentTask and otherTask,
+   *   if currentTask == otherTask, then its not a descendant. (unlike 
ancestor)
+   *   else, if isOtherTaskAncestorOfCurrentTask(otherTask, currentTask) then 
otherTask is descendant of currentTask
+   * @param currentTask
+   * @param otherTask
+   * @return
+   */
+  static boolean isOtherTaskDescendantOfCurrentTask(TaskName currentTask, 
TaskName otherTask) {
+    log.info("isOtherTaskDescendantOfCurrentTask with currentTask {} and 
otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && 
isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && 
isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor <= 
currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (otherTaskNameParts.keyBucket % 
currentTaskNameParts.elasticityFactor) == currentTaskNameParts.keyBucket;
+  }
+
+  /**
+   * For a given taskName and a map of task names to checkpoints, returns the 
taskName's ancestor and descendants checkpoints
+   * All ancestor checkpoints are put into a set
+   * Descendant checkpoins are put into a map of elasticityFactor to 
descendant checkpoint where the elastictyFactor is of the descendant.
+   * For example, given taskName Partition 0_0_2 and checkpoint Map (Partition 
0->C1, Partition 0_0_4-> C2, Partition 0_1_4 -> C3, Partition 0_2_4 ->C4)
+   * the return value is AncestorSet = <C1> and descendantMap = (4 -> <C2, C4>)
+   * See javadoc of isOtherTaskAncestorOfCurrentTask and 
isOtherTaskDescendantOfCurrentTask for definition of ancestor and descendant
+   * @param taskName name of the task
+   * @param checkpointMap map from taskName to checkpoint
+   * @return Pair of AncestorCheckpoint set and Descendant Checkpoint Map
+   */
+  static Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> 
getAncestorAndDescendantCheckpoints(
+      TaskName taskName, Map<TaskName, Checkpoint> checkpointMap) {
+    Set<Checkpoint> ancestorCheckpoints = new HashSet<>();
+    Map<Integer, Set<Checkpoint>> descendantCheckpoints = new HashMap<>();
+    log.info("starting to parse the checkpoint map to find ancestors and 
descendants for taskName {}", taskName.getTaskName());
+    checkpointMap.keySet().forEach(otherTaskName -> {
+      Checkpoint otherTaskCheckpoint = checkpointMap.get(otherTaskName);
+      if (isOtherTaskAncestorOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other 
task is ancestor", taskName, otherTaskName);
+        ancestorCheckpoints.add(otherTaskCheckpoint);
+      }
+      if (isOtherTaskDescendantOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other 
task is descendant", taskName, otherTaskName);
+        int otherEF = getElasticityFactorFromTaskName(otherTaskName);
+        if (!descendantCheckpoints.containsKey(otherEF)) {
+          descendantCheckpoints.put(otherEF, new HashSet<>());
+        }
+        descendantCheckpoints.get(otherEF).add(otherTaskCheckpoint);
+      }
+    });
+    log.info("done computing all ancestors and descendants of {}", taskName);
+    return new ImmutablePair<>(ancestorCheckpoints, descendantCheckpoints);
+  }
+
+  /**
+   * Given a checkpoint with offset map from SystemStreamPartition to offset, 
returns the offset for the desired ssp
+   * Only the system, stream and partition portions of the SSP are matched, 
the keyBucket is not considered.
+   * A checkpoint belongs to one task and a task would consume either the full 
SSP (aka no keyBucket)
+   * or consume exactly one of the keyBuckets of an SSP. Hence there will be 
at most one entry for an SSP in a checkpoint
+   * @param checkpoint Checkpoint containing SSP -> offset
+   * @param ssp SystemStreamPartition for which an offset needs to be fetched
+   * @return offset for the ssp in the Checkpoint or null if doesnt exist.
+   */
+  static String getOffsetForSSPInCheckpoint(Checkpoint checkpoint, 
SystemStreamPartition ssp) {
+    String checkpointStr = checkpoint.getOffsets().entrySet().stream()
+        .map(k -> k.getKey() + " : " + k.getValue())
+        .collect(Collectors.joining(", ", "{", "}"));
+    log.info("for ssp {}, in checkpoint {}", ssp, checkpointStr);
+
+    Optional<String> offsetFound = checkpoint.getOffsets().entrySet()
+        .stream()
+        .filter(entry -> 
entry.getKey().getSystemStream().equals(ssp.getSystemStream()) && entry.getKey()
+            .getPartition()
+            .equals(ssp.getPartition()))
+        .map(Map.Entry::getValue)
+        .findFirst();
+    if (offsetFound.isPresent()) {
+      return offsetFound.get();
+    }
+    log.warn("Could not find offset for ssp {} in checkpoint {}. returning 
null string as offset", ssp, checkpoint);
+    return null;
+  }
+
+  /**
+   * Given a set of checkpoints, find the max aka largest offset for an ssp
+   * Largest is determined by the SystemAdmin.offsetCompartor of the ssp's 
system.
+   * Only the system, stream and partition portions of the SSP are matched, 
the keyBucket is not considered.
+   * @param checkpointSet set of checkpoints
+   * @param ssp for which largest offset is needed
+   * @param systemAdmin of the ssp.getSystem()
+   * @return offset - string if one exists else null
+   */
+  static String getMaxOffsetForSSPInCheckpointSet(Set<Checkpoint> 
checkpointSet,

Review Comment:
   I see! Thanks for answering my question! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to