This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 295119349 remove keyBucket from checkpoint serde to allow for simpler 
rollback (#1625)
295119349 is described below

commit 29511934943d1501f369f49ad1b9bd78d66febe1
Author: lakshmi-manasa-g <mgadup...@linkedin.com>
AuthorDate: Tue Aug 2 16:19:21 2022 -0700

    remove keyBucket from checkpoint serde to allow for simpler rollback (#1625)
    
    Symptom: Rolling back to versions which dont accept 4 parts in checkpoint 
serde can throw NPE exceptions.
    Cause: As part of elasticity, #1608 introduced keyBucket into checkpoint 
serde.
    Fix: Remove keyBucket from checkpoint serde - aka do not add it when 
serializing checkpoint.
    
    Backwards Compatible: yes.
    Though checkpoints written after #1608 and before this PR will have 4 part 
SSP in checkpoint, serde, they can still be read by code in this PR and vice 
versa.
    elasticity will not work completely once this pr is merged.
---
 .../samza/serializers/model/SamzaObjectMapper.java       | 16 ++++------------
 .../org/apache/samza/serializers/CheckpointV1Serde.scala |  8 +-------
 2 files changed, 5 insertions(+), 19 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
 
b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index b317e9d98..3470fd855 100644
--- 
a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ 
b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -221,9 +221,7 @@ public class SamzaObjectMapper {
   public static class SystemStreamPartitionKeySerializer extends 
JsonSerializer<SystemStreamPartition> {
     @Override
     public void serialize(SystemStreamPartition ssp, JsonGenerator jgen, 
SerializerProvider provider) throws IOException {
-      String sspString = ssp.getSystem() + "." + ssp.getStream() + "."
-          + String.valueOf(ssp.getPartition().getPartitionId()) + "."
-          + String.valueOf(ssp.getKeyBucket());
+      String sspString = ssp.getSystem() + "." + ssp.getStream() + "." + 
String.valueOf(ssp.getPartition().getPartitionId());
       jgen.writeFieldName(sspString);
     }
   }
@@ -232,16 +230,10 @@ public class SamzaObjectMapper {
     @Override
     public Object deserializeKey(String sspString, DeserializationContext 
ctxt) throws IOException {
       String[] parts = sspString.split("\\.");
-      if (parts.length < 3 || parts.length > 4) {
-        throw new IllegalArgumentException("System stream partition expected 
in format 'system.stream.partition' "
-            + "or 'system.stream.partition.keyBucket");
+      if (parts.length < 3) {
+        throw new IllegalArgumentException("System stream partition expected 
in format 'system.stream.partition' ");
       }
-      if (parts.length == 3) {
-        return new SystemStreamPartition(
-            new SystemStream(parts[0], parts[1]), new 
Partition(Integer.parseInt(parts[2])));
-      }
-      // else parts.length == 4 and the 4th part is the keyBucket
-      return new SystemStreamPartition(parts[0], parts[1], new 
Partition(Integer.parseInt(parts[2])), Integer.parseInt(parts[3]));
+      return new SystemStreamPartition(new SystemStream(parts[0], parts[1]), 
new Partition(Integer.parseInt(parts[2])));
     }
   }
 
diff --git 
a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
 
b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
index c817e2ffe..e69a97300 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
@@ -55,12 +55,7 @@ class CheckpointV1Serde extends Serde[CheckpointV1] with 
Logging {
         require(partition != null, "Partition must be present in JSON-encoded 
SystemStreamPartition")
         val offset = sspInfo.get("offset")
         // allow null offsets, e.g. for changelog ssps
-        var keyBucket = sspInfo.get("keyBucket")
-        if (keyBucket == null) {
-          keyBucket = "-1"
-        }
-
-        new SystemStreamPartition(system, stream, new 
Partition(partition.toInt), keyBucket.toInt) -> offset
+        new SystemStreamPartition(system, stream, new 
Partition(partition.toInt)) -> offset
       }
 
       val cpMap = jMap.values.asScala.map(deserializeJSONMap).toMap
@@ -83,7 +78,6 @@ class CheckpointV1Serde extends Serde[CheckpointV1] with 
Logging {
         jMap.put("system", ssp.getSystemStream.getSystem)
         jMap.put("stream", ssp.getSystemStream.getStream)
         jMap.put("partition", ssp.getPartition.getPartitionId.toString)
-        jMap.put("keyBucket", ssp.getKeyBucket.toString)
         jMap.put("offset", offset)
 
         asMap.put(ssp.toString, jMap)

Reply via email to