This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch elasticity-checkpoint-lastProcessedOffsets
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 2cda1742469b5dcb0a3a623544875d59cf6661e1
Author: Manasa <[email protected]>
AuthorDate: Fri Apr 15 16:52:45 2022 -0700

    SAMZA-2733: [Elasticity] Compute last processed offsets when container 
starts up using checkpoints from previous deploys when elasticity was enabled
---
 .../java/org/apache/samza/config/JobConfig.java    |   9 +
 .../samza/elasticity/ElasticTaskNameParts.java     |  80 ++++
 .../apache/samza/elasticity/ElasticityUtils.java   | 486 +++++++++++++++++++++
 .../apache/samza/checkpoint/OffsetManager.scala    |  34 +-
 .../org/apache/samza/config/TestJobConfig.java     |   9 +
 .../samza/elasticity/TestElasticityUtils.java      | 363 +++++++++++++++
 6 files changed, 972 insertions(+), 9 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 638474715..b637325a6 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -177,6 +177,11 @@ public class JobConfig extends MapConfig {
   public static final String CONTAINER_HEARTBEAT_MONITOR_ENABLED = 
"job.container.heartbeat.monitor.enabled";
   private static final boolean CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT = 
true;
 
+  // if true, use checkpoints from previous deploys where elasticity was 
enabled
+  // set this to true if rolling back from elasticity to before elasticity.
+  public static final String JOB_ELASTICITY_CHECKPOINTS_ENABLED = 
"job.elasticity.checkpoints.enabled";
+  public static final boolean DEFAULT_JOB_ELASTICITY_CHECKPOINTS_ENABLED = 
false;
+
 
   // Enabled elasticity for the job
   // number of (elastic) tasks in the job will be old task count X elasticity 
factor
@@ -479,6 +484,10 @@ public class JobConfig extends MapConfig {
     return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED, 
CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT);
   }
 
+  public boolean getElasticityCheckpointEnabled() {
+    return getBoolean(JOB_ELASTICITY_CHECKPOINTS_ENABLED, 
DEFAULT_JOB_ELASTICITY_CHECKPOINTS_ENABLED);
+  }
+
   public boolean getElasticityEnabled() {
     return getElasticityFactor() > 1;
   }
diff --git 
a/samza-core/src/main/java/org/apache/samza/elasticity/ElasticTaskNameParts.java
 
b/samza-core/src/main/java/org/apache/samza/elasticity/ElasticTaskNameParts.java
new file mode 100644
index 000000000..f9268d9be
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/elasticity/ElasticTaskNameParts.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+public class ElasticTaskNameParts {
+
+  public static final int DEFAULT_KEY_BUCKET = 0;
+  public static final int DEFAULT_ELASTICITY_FACTOR = 1;
+  public static final int INVALID_PARTITION = -1;
+
+  public final String system;
+  public final String stream;
+  public final int partition;
+  public final int keyBucket;
+  public final int elasticityFactor;
+
+  public ElasticTaskNameParts(int partition) {
+    this(partition, DEFAULT_KEY_BUCKET, DEFAULT_ELASTICITY_FACTOR);
+  }
+
+  public ElasticTaskNameParts(int partition, int keyBucket, int 
elasticityFactor) {
+    this("", "", partition, keyBucket, elasticityFactor);
+  }
+
+  public ElasticTaskNameParts(String system, String stream, int partition) {
+    this(system, stream, partition, DEFAULT_KEY_BUCKET, 
DEFAULT_ELASTICITY_FACTOR);
+  }
+
+  public ElasticTaskNameParts(String system, String stream, int partition, int 
keyBucket, int elasticityFactor) {
+    this.system = system;
+    this.stream = stream;
+    this.partition = partition;
+    this.keyBucket = keyBucket;
+    this.elasticityFactor = elasticityFactor;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof ElasticTaskNameParts)) return false;
+
+    ElasticTaskNameParts that = (ElasticTaskNameParts) o;
+
+    if (!(this.system.equals(that.system))
+        || !(this.stream.equals(that.stream))
+        || (this.partition != that.partition)
+        || (this.keyBucket != that.keyBucket)
+        || (this.elasticityFactor != that.elasticityFactor)) {
+      return false;
+    }
+    return true;
+  }
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + system.hashCode();
+    result = prime * result + stream.hashCode();
+    result = prime * result + partition;
+    result = prime * result + keyBucket;
+    result = prime * result + elasticityFactor;
+    return result;
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java 
b/samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java
new file mode 100644
index 000000000..990702cd9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when 
elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP 
grouper or
+ * the {@link 
org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP 
grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = 
LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the 
elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition 
(\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, 
<Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link 
SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = 
"SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition 
\\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and 
GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 
0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names 
SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or 
GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns 
elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or 
false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return 
taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix 
"SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor 
> 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 
0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format 
"SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    }
+    return false;
+  }
+
+  /**
+   * From given taskName extract the values for system, stream, partition, 
keyBucket and elasticityFactor
+   * @param taskName any taskName
+   * @return ElasticTaskNameParts object containing system, stream, partition, 
keyBucket and elasticityFactor
+   *    for GroupByPartition task:
+   *         taskNames are of the format "Partition 0_1_2" (with elasticity) 
or "Partition 0" (without elasticity)
+   *         system and stream are empty "" strings and partition is the input 
partition,
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the 
default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and 
elasticityFactor (ex 2 from above)
+   *    for GroupBySystemStreamPartition task:
+   *         taskNames are of the format "SystemStreamPartition [systemA, 
streamB, 0, 1]_2" (with elasticity) or
+   *         "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
+   *         system and stream and partition are from the name (ex system = 
systemA, steram = streamB, partition =0 above)
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the 
default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and 
elasticityFactor (ex 2 from above)
+   *   for tasks created with other SSP groupers:
+   *        default ElasticTaskNameParts is returned which has empty system, 
stream,
+   *        -1 for partition and 0 for keyBucket and 1 for elasticity factor
+   */
+  static ElasticTaskNameParts getTaskNameParts(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      return getTaskNameParts_GroupByPartition(taskName);
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      return getTaskNameParts_GroupBySSP(taskName);
+    }
+    log.warn("TaskName {} is neither GroupByPartition nor 
GroupBySystemStreamPartition task. "
+        + "Elasticity is not supported for this taskName. "
+        + "Returning default ElasticTaskNameParts which has default keyBucket 
0,"
+        + " default elasticityFactor 1 and invalid partition -1", 
taskName.getTaskName());
+    return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION);
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName 
taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = 
Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+    Pattern nonElasticTaskPattern = 
Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)),
+          Integer.valueOf(matcher.group(2)),
+          Integer.valueOf(matcher.group(3)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)));
+    }
+    log.error("Could not extract partition, keybucket and elasticity factor 
from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupBySSP(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = 
Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+    Pattern nonElasticTaskPattern = 
Pattern.compile(TASK_NAME_GROUP_BY_SSP_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)),
+          Integer.valueOf(matcher.group(4)),
+          Integer.valueOf(matcher.group(5)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)));
+    }
+    log.warn("Could not extract system, stream, partition, keybucket and 
elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * Without elasticity, a task consumes an entire (full) SSP = [System, 
stream, partition].
+   * With elasticity, a task consumes a portion of the SSP_withKeyBucket = 
[system, stream, partition, keyBucket]
+   *    where 0 <= keyBucket < elasticityFactor and contains a subset of the 
IncomingMessageEnvelope(IME) from the full SSP
+   * Given two tasks currentTask and otherTask, the task otherTask is called 
ancestor of currentTask if the following is true
+   *    all IME consumed by currentTask will be consumed by otherTask when 
elasticityFactor decreases or stays same
+   *    For example:
+   *      case 1: elasticityFactor 2 to 1
+   *            otherTask = Partition 0 consuming all IME in SSP = [systemA, 
streamB, 0] when elasticityFactor=1
+   *            currentTask1 = Partition 0_0_2 consumes IME in 
SSP_withKeyBucket0 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor 
= 2
+   *            currentTask2 = Partition 0_1_2 consumes IME in 
SSP_withKeyBucket1 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor 
= 2
+   *            SSP =  SSP_withKeyBucket0 + SSP_withKeyBucket1. Thus, 
Partition 0 is ancestor of Partition 0_0_2 and Partition 0_1_2
+   *      case 2: elasticityFactor 2 to 2 - no change
+   *            Partition 0_0_2 is an ancestor of itself since the input 
SSP_withKeyBucket0 doesnt change
+   *            similarly Partition 0_1_2 is an ancestor of itself. This 
applies to all elasticityFactors
+   *      case 3: elasticityFactor 4 to 2
+   *            otherTask = Partition 0_0_2 consuming all IME in 
SSP_withKeyBucket0 = [systemA, streamB, 0, 0] when elasticityFactor=2
+   *            currentTask1 = Partition 0_0_4 consumes IME in 
SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when 
elasticityFactor = 4
+   *            currentTask2 = Partition 0_2_4 consumes IME in 
SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when 
elasticityFactor = 4
+   *            From the computation of SSP_withkeyBucket in {@link 
org.apache.samza.system.IncomingMessageEnvelope}
+   *            we have getSystemStreamPartition(int elasticityFactor) which 
does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % 
elasticityFactor;
+   *            Thus, SSP_withKeyBucket0 = SSP_withKeyBucket00 + 
SSP_withKeyBucket01.
+   *            Thus, Partition 0_0_2 is ancestor of Partition 0_0_4 and 
Partition 0_2_4
+   *            Similarly, Partition 0_1_2 is ancestor of Partition 0_1_4 and 
Partition 0_3_4
+   *            And transitively, Partition 0 is ancestor of Partition 0_0_4, 
Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
+   *
+   * This applies to tasks created by GroupByPartition and 
GroupBySystemStreamPartition SSPGroupers.
+   * aka this applies if both currentTask and otherTask are created by 
GroupByPartition or both are created by GroupBySystemStreamPartition
+   * If either currentTask and/or otherTask were created by other SSPGroupers 
then false is returned.
+   * @param currentTask
+   * @param otherTask
+   * @return true if otherTask is ancestor of currentTask, false otherwise
+   */
+  static boolean isOtherTaskAncestorOfCurrentTask(TaskName currentTask, 
TaskName otherTask) {
+    log.info("isOtherTaskAncestorOfCurrentTask with currentTask {} and 
otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && 
isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && 
isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor > 
currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (currentTaskNameParts.keyBucket % 
otherTaskNameParts.elasticityFactor) == otherTaskNameParts.keyBucket;
+  }
+
+  /**
+   * See javadoc for isOtherTaskAncestorOfCurrentTask above
+   * Given currentTask and otherTask,
+   *   if currentTask == otherTask, then its not a descendant. (unlike 
ancestor)
+   *   else, if isOtherTaskAncestorOfCurrentTask(otherTask, currentTask) then 
otherTask is descendant of currentTask
+   * @param currentTask
+   * @param otherTask
+   * @return
+   */
+  static boolean isOtherTaskDescendantOfCurrentTask(TaskName currentTask, 
TaskName otherTask) {
+    log.info("isOtherTaskDescendantOfCurrentTask with currentTask {} and 
otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && 
isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && 
isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor <= 
currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (otherTaskNameParts.keyBucket % 
currentTaskNameParts.elasticityFactor) == currentTaskNameParts.keyBucket;
+  }
+
+  /**
+   * For a given taskName and a map of task names to checkpoints, returns the 
taskName's ancestor and descendants checkpoints
+   * All ancestor checkpoints are put into a set
+   * Descendant checkpoins are put into a map of elasticityFactor to 
descendant checkpoint where the elastictyFactor is of the descendant.
+   * For example, given taskName Partition 0_0_2 and checkpoint Map (Partition 
0->C1, Partition 0_0_4-> C2, Partition 0_1_4 -> C3, Partition 0_2_4 ->C4)
+   * the return value is AncestorSet = <C1> and descendantMap = (4 -> <C2, C4>)
+   * See javadoc of isOtherTaskAncestorOfCurrentTask and 
isOtherTaskDescendantOfCurrentTask for definition of ancestor and descendant
+   * @param taskName name of the task
+   * @param checkpointMap map from taskName to checkpoint
+   * @return Pair of AncestorCheckpoint set and Descendant Checkpoint Map
+   */
+  static Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> 
getAncestorAndDescendantCheckpoints(
+      TaskName taskName, Map<TaskName, Checkpoint> checkpointMap) {
+    Set<Checkpoint> ancestorCheckpoints = new HashSet<>();
+    Map<Integer, Set<Checkpoint>> descendantCheckpoints = new HashMap<>();
+    log.info("starting to parse the checkpoint map to find ancestors and 
descendants for taskName {}", taskName.getTaskName());
+    checkpointMap.keySet().forEach(otherTaskName -> {
+      Checkpoint otherTaskCheckpoint = checkpointMap.get(otherTaskName);
+      if (isOtherTaskAncestorOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other 
task is ancestor", taskName, otherTaskName);
+        ancestorCheckpoints.add(otherTaskCheckpoint);
+      }
+      if (isOtherTaskDescendantOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other 
task is descendant", taskName, otherTaskName);
+        int otherEF = getElasticityFactorFromTaskName(otherTaskName);
+        if (!descendantCheckpoints.containsKey(otherEF)) {
+          descendantCheckpoints.put(otherEF, new HashSet<>());
+        }
+        descendantCheckpoints.get(otherEF).add(otherTaskCheckpoint);
+      }
+    });
+    log.info("done computing all ancestors and descendants of {}", taskName);
+    return new ImmutablePair<>(ancestorCheckpoints, descendantCheckpoints);
+  }
+
+  /**
+   * Given a checkpoint with offset map from SystemStreamPartition to offset, 
returns the offset for the desired ssp
+   * Only the system, stream and partition portions of the SSP are matched, 
the keyBucket is not considered.
+   * A checkpoint belongs to one task and a task would consume either the full 
SSP (aka no keyBucket)
+   * or consume exactly one of the keyBuckets of an SSP. Hence there will be 
at most one entry for an SSP in a checkpoint
+   * @param checkpoint Checkpoint containing SSP -> offset
+   * @param ssp SystemStreamPartition for which an offset needs to be fetched
+   * @return offset for the ssp in the Checkpoint or null if doesnt exist.
+   */
+  static String getOffsetForSSPInCheckpoint(Checkpoint checkpoint, 
SystemStreamPartition ssp) {
+    String checkpointStr = checkpoint.getOffsets().entrySet().stream()
+        .map(k -> k.getKey() + " : " + k.getValue())
+        .collect(Collectors.joining(", ", "{", "}"));
+    log.info("for ssp {}, in checkpoint {}", ssp, checkpointStr);
+
+    Optional<String> offsetFound = checkpoint.getOffsets().entrySet()
+        .stream()
+        .filter(entry -> 
entry.getKey().getSystemStream().equals(ssp.getSystemStream()) && entry.getKey()
+            .getPartition()
+            .equals(ssp.getPartition()))
+        .map(Map.Entry::getValue)
+        .findFirst();
+    if (offsetFound.isPresent()) {
+      return offsetFound.get();
+    }
+    log.warn("Could not find offset for ssp {} in checkpoint {}. returning 
null string as offset", ssp, checkpoint);
+    return null;
+  }
+
+  /**
+   * Given a set of checkpoints, find the max aka largest offset for an ssp
+   * Largest is determined by the SystemAdmin.offsetCompartor of the ssp's 
system.
+   * Only the system, stream and partition portions of the SSP are matched, 
the keyBucket is not considered.
+   * @param checkpointSet set of checkpoints
+   * @param ssp for which largest offset is needed
+   * @param systemAdmin of the ssp.getSystem()
+   * @return offset - string if one exists else null
+   */
+  static String getMaxOffsetForSSPInCheckpointSet(Set<Checkpoint> 
checkpointSet,
+      SystemStreamPartition ssp, SystemAdmin systemAdmin) {
+    return checkpointSet.stream()
+        .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
+        .filter(Objects::nonNull)
+        .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, 
offset1)) //confirm reverse sort - aka largest offset first
+        .findFirst().orElse(null);
+  }
+
+  /**
+   * Given a set of checkpoints, find the min aka smallest offset for an ssp
+   * Smallest is determined by the SystemAdmin.offsetCompartor of the ssp's 
system.
+   * Only the system, stream and partition portions of the SSP are matched, 
the keyBucket is not considered.
+   * @param checkpointSet set of checkpoints
+   * @param ssp for which largest offset is needed
+   * @param systemAdmin of the ssp.getSystem()
+   * @return offset - string if one exists else null
+   */
+  static String getMinOffsetForSSPInCheckpointSet(Set<Checkpoint> 
checkpointSet,
+      SystemStreamPartition ssp, SystemAdmin systemAdmin) {
+    return checkpointSet.stream()
+        .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
+        .filter(Objects::nonNull)
+        .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset1, 
offset2)) //confirm ascending sort - aka smallest offset first
+        .findFirst().orElse(null);
+  }
+
+  /**
+   * Prereq: See javadoc for isOtherTaskAncestorOfCurrentTask and 
isOtherTaskDescendantOfCurrentTask to fully understand ancestor and descendant 
notion
+   * Briefly, Given tasks - Partition 0, Partition 0_0_2, Partition 0_1_2 and 
Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
+   * (recall Partition 0_1_2 means reads input partition 0, keyBucket 1 and 
elasticityFactor 2)
+   * For task Partition 0_0_2: ancestors = [Partition 0, Partition 0_0_2] and 
descendants = [Partition 0_0_4, Partition 0_2_4]
+   *
+   * If a task has no descendants, then we just need to pick the largest 
offset among all the ancestors to get the last processed offset.
+   * for example above, if Partition 0_0_2 only had ancestors and no 
descendants, taking largest offset among Partition 0 and 0_0_2 gives last proc 
offset.
+   *
+   * With descendants, a little care is needed. there could be descendants 
with different elasticity factors.
+   * given one elasticity factor, each the descendant within the elasticity 
factor consumes a sub-portion (aka keyBucket) of the task.
+   * hence, to avoid data loss, we need to pick the lowest offset across 
descendants of the same elasticity factor.
+   * Across elasticity factors, largest works just like in ancestor
+   *
+   * Taking a concrete example
+   * From {@link org.apache.samza.system.IncomingMessageEnvelope} (IME)
+   *    Partition 0 consunmig all IME in SSP = [systemA, streamB, 0] when 
elasticityFactor=1
+   *    Partition 0_1_2 consuming all IME in SSP_withKeyBucket0 = [systemA, 
streamB, 0, 1 (keyBucket)] when elasticityFactor=2
+   *    Partition 0_0_2 consuming all IME in SSP_withKeyBucket1 = [systemA, 
streamB, 0, 0 (keyBucket)] when elasticityFactor=2
+   *    Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, 
streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
+   *    Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, 
streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
+   *    From the computation of SSP_withkeyBucket in {@link 
org.apache.samza.system.IncomingMessageEnvelope}
+   *    we have getSystemStreamPartition(int elasticityFactor) which does 
keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
+   *    Thus,
+   *       SSP = SSP_withKeyBucket0 + SSP_withKeyBucket1.
+   *       SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
+   *    If the checkpoint map has
+   *      Partition 0: (SSP : 1), Partition 0_0_2: (SSP0 : 2), Partition 
0_1_2: (SSP1 : 3), Partition 0_0_4: (SSP0 : 4), Partition 0_2_4: (SSP1 : 6)
+   *      looking at these map and knowing that offsets are monotonically 
increasing, it is clear that last deploy was with elasticity factor = 4
+   *      to get checkpoint for Partition 0_0_2, we need to consider last 
deploy's offsets.
+   *      picking 6 (offset for Partition 0_2_4) means that 0_0_2 will start 
proc from 6 but offset 5 was never processed.
+   *      hence we need to take min of offsets within an elasticity factor.
+   *
+   * Given checkpoints for all the tasks in the checkpoint stream,
+   * computing the last proc offset for an ssp checkpoint for a task,
+   * the following needs to be met.
+   *    1. Ancestors: we need to take largest offset among ancestors for an ssp
+   *    2. Descendants:
+   *         a. group descendants by their elasticityFactor.
+   *         b. among descendants of the same elasticityFactor, take the 
smallest offset for an ssp
+   *         c. once step b is done, we have (elasticityFactor : 
smallest-offset-for-ssp) set, pick the largest in this set
+   *    3. Pick the larger among the offsets received from step 1 (for 
ancestors) and step 2 (for descendants)
+   *
+   * @param taskName
+   * @param taskSSPSet
+   * @param checkpointMap
+   * @param systemAdmins
+   * @return
+   */
+  public static Map<SystemStreamPartition, String> 
computeLastProcessedOffsetsFromCheckpointMap(
+      TaskName taskName,
+      Set<SystemStreamPartition> taskSSPSet,
+      Map<TaskName, Checkpoint> checkpointMap,
+      SystemAdmins systemAdmins) {
+    Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> 
acnestorsAndDescendantsFound =
+        getAncestorAndDescendantCheckpoints(taskName, checkpointMap);
+    Set<Checkpoint> ancestorCheckpoints = 
acnestorsAndDescendantsFound.getLeft();
+    Map<Integer, Set<Checkpoint>> descendantCheckpoints = 
acnestorsAndDescendantsFound.getRight();
+
+    Map<SystemStreamPartition, String> taskSSPOffsets = new HashMap<>();
+
+    taskSSPSet.forEach(ssp_withKeyBucket -> {
+      log.info("for taskName {} and ssp of the task {}, finding its last proc 
offset", taskName, ssp_withKeyBucket);
+
+      SystemStreamPartition ssp = new 
SystemStreamPartition(ssp_withKeyBucket.getSystemStream(),
+          ssp_withKeyBucket.getPartition());
+
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+
+      String currentLastOffsetForSSP = null;
+
+      String ancestorLastOffsetForSSP = 
getMaxOffsetForSSPInCheckpointSet(ancestorCheckpoints, ssp, systemAdmin);
+
+      log.info("for taskName {} and ssp {} got lastoffset from ancestors as 
{}",
+          taskName, ssp_withKeyBucket, ancestorLastOffsetForSSP);
+
+      String descendantLastOffsetForSSP = 
descendantCheckpoints.entrySet().stream()
+          .map(entry -> getMinOffsetForSSPInCheckpointSet(entry.getValue(), 
ssp, systemAdmin)) // at each ef level, find min offset
+          .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, 
offset1)) //confirm reverse sort - aka largest offset first
+          .findFirst().orElse(null);
+
+      log.info("for taskName {} and ssp {} got lastoffset from ancestors as 
{}",
+          taskName, ssp_withKeyBucket, descendantLastOffsetForSSP);
+
+      Integer offsetComparison = 
systemAdmin.offsetComparator(ancestorLastOffsetForSSP, 
descendantLastOffsetForSSP);
+      if (offsetComparison != null && offsetComparison > 0) { // means 
ancestorLastOffsetForSSP > descendantLastOffsetForSSP
+        currentLastOffsetForSSP = ancestorLastOffsetForSSP;
+      } else {
+        currentLastOffsetForSSP = descendantLastOffsetForSSP;
+      }
+      log.info("for taskName {} and ssp {} got lastoffset as {}", taskName, 
ssp_withKeyBucket, currentLastOffsetForSSP);
+      taskSSPOffsets.put(ssp_withKeyBucket, currentLastOffsetForSSP);
+    });
+
+    String checkpointStr = taskSSPOffsets.entrySet().stream()
+        .map(k -> k.getKey() + " : " + k.getValue())
+        .collect(Collectors.joining(", ", "{", "}"));
+    log.info("for taskName {}, returning checkpoint as {}", taskName, 
checkpointStr);
+    return taskSSPOffsets;
+  }
+
+  public static boolean wasElasticityEnabled(Map<TaskName, Checkpoint> 
checkpointMap) {
+    return checkpointMap.keySet().stream()
+        .filter(ElasticityUtils::isTaskNameElastic) // true if the taskName 
has elasticityFactor in it
+        .findFirst().isPresent();
+  }
+}
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 7491eaaaf..7a12625f0 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -22,12 +22,13 @@ package org.apache.samza.checkpoint
 import java.util
 import java.util.HashMap
 import java.util.concurrent.ConcurrentHashMap
-
 import org.apache.commons.lang3.StringUtils
 import org.apache.samza.SamzaException
 import org.apache.samza.annotation.InterfaceStability
-import org.apache.samza.config.{Config, StreamConfig, SystemConfig}
+import org.apache.samza.checkpoint.OffsetManager.info
+import org.apache.samza.config.{Config, JobConfig, StreamConfig, SystemConfig}
 import org.apache.samza.container.TaskName
+import org.apache.samza.elasticity.ElasticityUtils
 import org.apache.samza.startpoint.{Startpoint, StartpointManager}
 import org.apache.samza.system.SystemStreamMetadata.OffsetType
 import org.apache.samza.system._
@@ -105,7 +106,10 @@ object OffsetManager extends Logging {
           // Build OffsetSetting so we can create a map for OffsetManager.
           (systemStream, OffsetSetting(systemStreamMetadata, 
defaultOffsetType, resetOffset))
       }.toMap
-    new OffsetManager(offsetSettings, checkpointManager, startpointManager, 
systemAdmins, checkpointListeners, offsetManagerMetrics)
+    val elasticityCheckpointsEnabled = new 
JobConfig(config).getElasticityCheckpointEnabled
+
+    new OffsetManager(offsetSettings, checkpointManager, startpointManager, 
systemAdmins, checkpointListeners,
+      offsetManagerMetrics, elasticityCheckpointsEnabled)
   }
 }
 
@@ -160,7 +164,12 @@ class OffsetManager(
   /**
    * offsetManagerMetrics for keeping track of checkpointed offsets of each 
SystemStreamPartition.
    */
-  val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) 
extends Logging {
+  val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics,
+
+  /**
+   * if true, checkpoints generated during elasticity deploys will be used for 
last processed offsets computation at container start
+   */
+  val elasticityCheckpointsEnabled: Boolean = false) extends Logging {
 
   /**
    * Last offsets processed for each SystemStreamPartition.
@@ -461,12 +470,19 @@ class OffsetManager(
 
     val checkpoint = checkpointManager.readLastCheckpoint(taskName)
 
-    if (checkpoint != null) {
-      Map(taskName -> checkpoint.getOffsets.asScala.toMap)
-    } else {
-      info("Did not receive a checkpoint for taskName %s. Proceeding without a 
checkpoint." format taskName)
-      Map(taskName -> Map())
+    val checkpointMap = checkpointManager.readAllCheckpoints()
+    if (!elasticityCheckpointsEnabled || 
!ElasticityUtils.wasElasticityEnabled(checkpointMap)) {
+      if (checkpoint != null) {
+        return Map(taskName -> checkpoint.getOffsets.asScala.toMap)
+      } else {
+        info("Did not receive a checkpoint for taskName %s. Proceeding without 
a checkpoint." format taskName)
+        return Map(taskName -> Map())
+      }
     }
+    info("Elasticity checkpoints is enabled and there was elasticity enabled 
in one of the previous deploys." +
+      "Last processed offsets computation at container start will use 
elasticity checkpoints if available.")
+    Map(taskName -> 
ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(taskName,
+      systemStreamPartitions.get(taskName).get.asJava, checkpointMap, 
systemAdmins).asScala)
   }
 
   /**
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 4d171662c..9cf70ff07 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -600,6 +600,15 @@ public class TestJobConfig {
         "false"))).getContainerHeartbeatMonitorEnabled());
   }
 
+  @Test
+  public void testGetElasticityCheckpointEnabled() {
+    assertFalse(new JobConfig(new 
MapConfig()).getElasticityCheckpointEnabled());
+    assertTrue(new JobConfig(new MapConfig(
+        ImmutableMap.of(JobConfig.JOB_ELASTICITY_CHECKPOINTS_ENABLED, 
"true"))).getElasticityCheckpointEnabled());
+    assertFalse(new JobConfig(new 
MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_CHECKPOINTS_ENABLED,
+        "false"))).getElasticityCheckpointEnabled());
+  }
+
   @Test
   public void testGetElastictyEnabled() {
     // greater than 1 means enabled
diff --git 
a/samza-core/src/test/java/org/apache/samza/elasticity/TestElasticityUtils.java 
b/samza-core/src/test/java/org/apache/samza/elasticity/TestElasticityUtils.java
new file mode 100644
index 000000000..7590c9a3a
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/elasticity/TestElasticityUtils.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+// #TODO: going to make this entire class parametrized.
+public class TestElasticityUtils {
+  private static final TaskName TASKNAME_GROUP_BY_PARTITION = new 
TaskName("Partition 0");
+  private static final TaskName ELASTIC_TASKNAME_GROUP_BY_PARTITION = new 
TaskName("Partition 0_1_2");
+  private static final TaskName TASKNAME_GROUP_BY_SSP = new 
TaskName("SystemStreamPartition [systemA, streamB, 0]");
+  private static final TaskName ELASTIC_TASKNAME_GROUP_BY_SSP = new 
TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_2");
+
+  @Test
+  public void testComputeLastProcessedOffsetsFromCheckpointMap() {
+    // Setup :
+    // there is one ssp = SystemStreamPartition [systemA, streamB, 
partition(0)] consumed by the job
+    // Note: Partition 0_1_2 means task consumes keyBucket 1 of partition 0 
and has elasticityFactor 2.
+    // Before elasticity, job has one task with name "Partition 0"
+    // with elasticity factor 2, job has 2 tasks with names "Partition 0_0_2" 
and "Partition 0_1_2"
+    //         Partition 0_0_2 consumes SSP[systemA, stream B, partition(0), 
keyBucket(0)]
+    //         Partition 0_1_2 consumes SSP[systemA, stream B, partition(0), 
keyBucket(1)]
+    // with elasticity factor 4, job has 4 tasks with names "Partition 0_0_4", 
"Partition 0_1_4", "Partition 0_2_4" and "Partition 0_3_4"
+    //         Partition 0_0_4 consumes SSP[systemA, stream B, partition(0), 
keyBucket(0)]
+    //         Partition 0_1_4 consumes SSP[systemA, stream B, partition(0), 
keyBucket(1)]
+    //         Partition 0_2_4 consumes SSP[systemA, stream B, partition(0), 
keyBucket(2)]
+    //         Partition 0_3_4 consumes SSP[systemA, stream B, partition(0), 
keyBucket(3)]
+
+    //
+    // From the definition of keyBucket computation using elasticity factor in
+    // {@link 
IncomingMessageEnvelope.getSystemStresamPartition(elasticityFactor) as
+    // keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % 
elasticityFactor
+    // messages processed by 0_0_4 and 0_2_4 will be the same as those 
processed by 0_0_2
+    // messages processed by 0_1_4 and 0_3_4 will be the same as those 
processed by 0_1_2
+    // messages processed by 0_0_2 and 0_1_2 will be the same as those 
processed by Partition 0 itself
+
+    TaskName taskName = new TaskName("Partition 0_0_2");
+    Map<TaskName, Checkpoint> checkpointMap = new HashMap<>();
+    SystemStreamPartition ssp = new SystemStreamPartition("systemA", 
"streamB", new Partition(0));
+    SystemStreamPartition ssp0 = new SystemStreamPartition("systemA", 
"streamB", new Partition(0), 0);
+    SystemStreamPartition ssp2 = new SystemStreamPartition("systemA", 
"streamB", new Partition(0), 2);
+
+
+    SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class);
+    // offsets ordering 1 < 2 < 3 < 4
+    Mockito.when(mockSystemAdmin.offsetComparator("1", "2")).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator("2", "1")).thenReturn(1);
+    Mockito.when(mockSystemAdmin.offsetComparator("1", "3")).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator("3", "1")).thenReturn(1);
+    Mockito.when(mockSystemAdmin.offsetComparator("1", "4")).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator("4", "1")).thenReturn(1);
+    Mockito.when(mockSystemAdmin.offsetComparator("2", "3")).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator("3", "2")).thenReturn(1);
+    Mockito.when(mockSystemAdmin.offsetComparator("2", "4")).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator("4", "2")).thenReturn(1);
+    Mockito.when(mockSystemAdmin.offsetComparator("3", "4")).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator("4", "3")).thenReturn(1);
+
+    SystemAdmins mockSystemAdmins = Mockito.mock(SystemAdmins.class);
+    
Mockito.when(mockSystemAdmins.getSystemAdmin(ssp0.getSystem())).thenReturn(mockSystemAdmin);
+
+    // case 1: for task Partition 0_0_2: last deploy was with ef = 2 itself.
+    // hence "Partition 0_0_2" has the largest offset and that should be used 
for computing checkpoint for 0_0_2 now also
+    checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, 
"1"));
+    checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, 
"4"));
+    checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, 
"2"));
+    checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, 
"3"));
+    Map<SystemStreamPartition, String> result = 
ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
+        taskName, Collections.singleton(ssp0), checkpointMap, 
mockSystemAdmins);
+    Assert.assertEquals("4", result.get(ssp0));
+
+    // case 2: for task Partition 0_0_2: last deploy was with ef =1
+    // hence "Partition 0" has the largest offset. Computing checkpint for 
0_0_2 should use this largest offset
+    checkpointMap = new HashMap<>();
+    checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, 
"4"));
+    checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, 
"1"));
+    checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, 
"3"));
+    checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, 
"2"));
+
+
+    result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
+        taskName, Collections.singleton(ssp0), checkpointMap, 
mockSystemAdmins);
+    Assert.assertEquals("4", result.get(ssp0));
+
+
+    // case 3: for task partition 0_0_2: last deploy was with ef = 4
+    // hence checkpoints of Partition 0_0_4 and Partition 0_3_4 are relevant.
+    // since messages from both end up in 0_0_2 with ef=2, need to take min of 
their checkpointed offsets
+
+    checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, 
"1"));
+    checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, 
"2"));
+    checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, 
"3"));
+    checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, 
"4"));
+    result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
+        taskName, Collections.singleton(ssp0), checkpointMap, 
mockSystemAdmins);
+    Assert.assertEquals("3", result.get(ssp0));
+  }
+
+  @Test
+  public void testTaskIsGroupByPartitionOrGroupBySSP() {
+    String msgPartition = "GroupByPartition task should start with Partition";
+    String msgSsp = "GroupBySystemStreamPartition task should start with 
SystemStreamPartition";
+
+    Assert.assertTrue(msgPartition, 
ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_PARTITION));
+    Assert.assertFalse(msgPartition, 
ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_PARTITION));
+
+    Assert.assertTrue(msgPartition, 
ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_PARTITION));
+    Assert.assertFalse(msgPartition, 
ElasticityUtils.isGroupBySystemStreamPartitionTask(
+        ELASTIC_TASKNAME_GROUP_BY_PARTITION));
+
+    Assert.assertTrue(msgSsp, 
ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_SSP));
+    Assert.assertFalse(msgSsp, 
ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_SSP));
+
+    Assert.assertTrue(msgSsp, 
ElasticityUtils.isGroupBySystemStreamPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP));
+    Assert.assertFalse(msgSsp, 
ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP));
+
+    TaskName taskName = new TaskName("FooBar");
+    Assert.assertFalse(msgPartition, 
ElasticityUtils.isGroupByPartitionTask(taskName));
+    Assert.assertFalse(msgSsp, 
ElasticityUtils.isGroupBySystemStreamPartitionTask(taskName));
+  }
+
+  @Test
+  public void testIsTaskNameElastic() {
+    
Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_SSP));
+    
Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_SSP));
+    
Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_PARTITION));
+    
Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_PARTITION));
+  }
+
+  @Test
+  public void testGetElasticTaskNameParts() {
+    ElasticTaskNameParts taskNameParts = 
ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_PARTITION);
+    Assert.assertEquals(taskNameParts.partition, 0);
+    Assert.assertEquals(taskNameParts.keyBucket, 
ElasticTaskNameParts.DEFAULT_KEY_BUCKET);
+    Assert.assertEquals(taskNameParts.elasticityFactor, 
ElasticTaskNameParts.DEFAULT_ELASTICITY_FACTOR);
+
+    taskNameParts = 
ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_PARTITION);
+    Assert.assertEquals(taskNameParts.partition, 0);
+    Assert.assertEquals(taskNameParts.keyBucket, 1);
+    Assert.assertEquals(taskNameParts.elasticityFactor, 2);
+
+    taskNameParts = ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_SSP);
+    Assert.assertEquals(taskNameParts.system, "systemA");
+    Assert.assertEquals(taskNameParts.stream, "streamB");
+    Assert.assertEquals(taskNameParts.partition, 0);
+    Assert.assertEquals(taskNameParts.keyBucket, 
ElasticTaskNameParts.DEFAULT_KEY_BUCKET);
+    Assert.assertEquals(taskNameParts.elasticityFactor, 
ElasticTaskNameParts.DEFAULT_ELASTICITY_FACTOR);
+
+    taskNameParts = 
ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_SSP);
+    Assert.assertEquals(taskNameParts.system, "systemA");
+    Assert.assertEquals(taskNameParts.stream, "streamB");
+    Assert.assertEquals(taskNameParts.partition, 0);
+    Assert.assertEquals(taskNameParts.keyBucket, 1);
+    Assert.assertEquals(taskNameParts.elasticityFactor, 2);
+
+    taskNameParts = ElasticityUtils.getTaskNameParts(new TaskName("FooBar"));
+    Assert.assertEquals(taskNameParts.partition, 
ElasticTaskNameParts.INVALID_PARTITION);
+  }
+
+  @Test
+  public void testIsOtherTaskAncestorDescendantOfCurrentTask() {
+    TaskName task0 = new TaskName("Partition 0");
+    TaskName task1 = new TaskName("Partition 1");
+    TaskName task002 = new TaskName("Partition 0_0_2");
+    TaskName task012 = new TaskName("Partition 0_1_2");
+    TaskName task004 = new TaskName("Partition 0_0_4");
+    TaskName task014 = new TaskName("Partition 0_1_4");
+    TaskName task024 = new TaskName("Partition 0_2_4");
+    TaskName task034 = new TaskName("Partition 0_3_4");
+
+    TaskName sspTask0 = new TaskName("SystemStreamPartition [systemA, streamB, 
0]");
+    TaskName sspTask002 = new TaskName("SystemStreamPartition [systemA, 
streamB, 0, 0]_2");
+    TaskName sspTask012 = new TaskName("SystemStreamPartition [systemA, 
streamB, 0, 1]_2");
+    TaskName sspTask004 = new TaskName("SystemStreamPartition [systemA, 
streamB, 0, 0]_4");
+    TaskName sspTask014 = new TaskName("SystemStreamPartition [systemA, 
streamB, 0, 1]_4");
+    TaskName sspTask024 = new TaskName("SystemStreamPartition [systemA, 
streamB, 0, 2]_4");
+    TaskName sspTask034 = new TaskName("SystemStreamPartition [systemA, 
streamB, 0, 3]_4");
+
+    // Partition 0 is ancestor of all tasks Partition 0_0_2, 0_1_2, 0_0_4, 
0_1_4, 0_2_4, 0_3_4 and itself
+    // and all these tasks are descendants of Partition 0 (except itself)
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, 
task0));
+    Assert.assertFalse(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, 
task1));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, 
task0));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task012, 
task0));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, 
task0));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task014, 
task0));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, 
task0));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task034, 
task0));
+
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, 
task002));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, 
task012));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, 
task004));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, 
task014));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, 
task024));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, 
task034));
+
+    // Partition 0_0_2 is ancestor of tasks Partition 0_0_4 and 0_2_4 and 
itself
+    // these tasks are descendants of 0_0_2
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, 
task002));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, 
task002));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, 
task002));
+
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, 
task004));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, 
task024));
+
+    // "SystemStreamPartition [systemA, streamB, 0]
+    // is ancestor of all tasks "SystemStreamPartition [systemA, streamB, 0, 
0]_2, [systemA, streamB, 0, 1]_2 and the rest incl itself
+    // and all these tasks are descendants of Partition 0 (except itself)
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask0, 
sspTask0));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, 
sspTask0));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask012, 
sspTask0));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, 
sspTask0));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask014, 
sspTask0));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, 
sspTask0));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask034, 
sspTask0));
+
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, 
sspTask002));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, 
sspTask012));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, 
sspTask004));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, 
sspTask014));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, 
sspTask024));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, 
sspTask034));
+
+    // SystemStreamPartition [systemA, streamB, 0, 0]_2 is ancestor of
+    // tasks SystemStreamPartition [systemA, streamB, 0, 0]_4, 
SystemStreamPartition [systemA, streamB, 0, 2]_4 and itself
+    // similarly, these tasks are descendants of SystemStreamPartition 
[systemA, streamB, 0, 0]_2
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, 
sspTask002));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, 
sspTask002));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, 
sspTask002));
+
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002,
 sspTask004));
+    
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002,
 sspTask024));
+  }
+
+  @Test
+  public void testGetAncestorAndDescendantCheckpoints() {
+    TaskName taskName = new TaskName("Partition 0_0_2");
+    Map<TaskName, Checkpoint> checkpointMap = new HashMap<>();
+    SystemStreamPartition ssp = new SystemStreamPartition("systemA", 
"streamB", new Partition(0));
+    Checkpoint ansCheckpoint1 = buildCheckpointV2(ssp, "1");
+    Checkpoint ansCheckpoint2 = buildCheckpointV2(ssp, "2");
+    Checkpoint desCheckpoint1 = buildCheckpointV2(ssp, "3");
+    Checkpoint desCheckpoint2 = buildCheckpointV2(ssp, "4");
+    Checkpoint unrelCheckpoint = buildCheckpointV2(ssp, "5");
+    Set<Checkpoint> ansCheckpointSet = new 
HashSet<>(Arrays.asList(ansCheckpoint1, ansCheckpoint2));
+    Set<Checkpoint> desCheckpointSet = new 
HashSet<>(Arrays.asList(desCheckpoint1, desCheckpoint2));
+
+    checkpointMap.put(new TaskName("Partition 0"), ansCheckpoint1);
+    checkpointMap.put(new TaskName("Partition 0_0_2"), ansCheckpoint2);
+    checkpointMap.put(new TaskName("Partition 0_0_4"), desCheckpoint1);
+    checkpointMap.put(new TaskName("Partition 0_2_4"), desCheckpoint2);
+    checkpointMap.put(new TaskName("Partition 0_1_4"), unrelCheckpoint);
+
+    Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> result =
+        ElasticityUtils.getAncestorAndDescendantCheckpoints(taskName, 
checkpointMap);
+    Set<Checkpoint> anscestorCheckpointSet = result.getLeft();
+    Set<Checkpoint> descendantCheckpointSetForEf4 = result.getRight().get(4);
+
+    Assert.assertTrue("should contain all ancestors' checkpoints",
+        anscestorCheckpointSet.containsAll(ansCheckpointSet));
+    Assert.assertFalse("should not contain a descendant checkpoint in anscetor 
list",
+        anscestorCheckpointSet.contains(desCheckpoint1));
+    Assert.assertFalse("should not contain an unrelated checkpoint in ancestor 
list",
+        anscestorCheckpointSet.contains(unrelCheckpoint));
+
+    Assert.assertTrue("should contain all descendants' checkpoints",
+        descendantCheckpointSetForEf4.containsAll(desCheckpointSet));
+    Assert.assertFalse("should not contain a anscetor checkpoint in descendant 
list",
+        descendantCheckpointSetForEf4.contains(ansCheckpoint1));
+    Assert.assertFalse("should not contain an unrelated checkpoint in 
descendant list",
+        descendantCheckpointSetForEf4.contains(unrelCheckpoint));
+  }
+
+  @Test
+  public void testGetOffsetForSSPInCheckpoint() {
+    String offset1 = "1111";
+    String offset2 = "2222";
+    // case 1: when looking for exact ssp
+    SystemStreamPartition ssp = new SystemStreamPartition("systemA", 
"streamB", new Partition(0));
+    Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1);
+    
Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, 
ssp), offset1);
+
+    // case 2: checkpoint has ssp with key bucket but looking for the full ssp 
(same system stream and partition but without keybucket)
+    SystemStreamPartition sspWithKB = new SystemStreamPartition("systemA", 
"streamB", new Partition(0), 1);
+    checkpoint1 = buildCheckpointV2(sspWithKB, offset2);
+    
Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, 
ssp), offset2);
+
+    // case 3: try getting offset for an ssp not present in the checkpoint -> 
should return null
+    SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new 
Partition(1));
+    
Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, 
ssp2), null);
+  }
+
+  @Test
+  public void testGetMaxMinOffsetForSSPInCheckpointSet() {
+    String offset1 = "1111";
+    String offset2 = "2222";
+
+    SystemStreamPartition ssp = new SystemStreamPartition("systemA", 
"streamB", new Partition(0));
+    Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1);
+    Checkpoint checkpoint2 = buildCheckpointV2(ssp, offset2);
+    Set<Checkpoint> checkpointSet = new HashSet<>(Arrays.asList(checkpoint1, 
checkpoint2));
+
+    SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class);
+    // offset 1 < offset2
+    Mockito.when(mockSystemAdmin.offsetComparator(offset1, 
offset2)).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator(offset2, 
offset1)).thenReturn(1);
+
+    // case 1: when exact ssp is in checkpoint set
+    Assert.assertEquals(offset2, 
ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp, 
mockSystemAdmin));
+    Assert.assertEquals(offset1, 
ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp, 
mockSystemAdmin));
+
+    // case 2: when looking for ssp with keyBucket 1 whereas checkpoint set 
only has full ssp (same system stream and partition but without keybucket)
+    SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, 1);
+    Assert.assertEquals(offset2, 
ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, 
sspWithKeyBucket, mockSystemAdmin));
+    Assert.assertEquals(offset1, 
ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, 
sspWithKeyBucket, mockSystemAdmin));
+
+
+    // case 3: when ssp not in checkpoint set -> should receive null for min 
and max offset
+    SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new 
Partition(0));
+    Assert.assertEquals(null, 
ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp2, 
mockSystemAdmin));
+    Assert.assertEquals(null, 
ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp2, 
mockSystemAdmin));
+  }
+
+  private static CheckpointV2 buildCheckpointV2(SystemStreamPartition ssp, 
String offset) {
+    return new CheckpointV2(CheckpointId.create(), ImmutableMap.of(ssp, 
offset),
+        ImmutableMap.of("backend", ImmutableMap.of("store", "10")));
+  }
+}

Reply via email to