This is an automated email from the ASF dual-hosted git repository. bharathkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 295119349 remove keyBucket from checkpoint serde to allow for simpler rollback (#1625) 295119349 is described below commit 29511934943d1501f369f49ad1b9bd78d66febe1 Author: lakshmi-manasa-g <mgadup...@linkedin.com> AuthorDate: Tue Aug 2 16:19:21 2022 -0700 remove keyBucket from checkpoint serde to allow for simpler rollback (#1625) Symptom: Rolling back to versions which dont accept 4 parts in checkpoint serde can throw NPE exceptions. Cause: As part of elasticity, #1608 introduced keyBucket into checkpoint serde. Fix: Remove keyBucket from checkpoint serde - aka do not add it when serializing checkpoint. Backwards Compatible: yes. Though checkpoints written after #1608 and before this PR will have 4 part SSP in checkpoint, serde, they can still be read by code in this PR and vice versa. elasticity will not work completely once this pr is merged. --- .../samza/serializers/model/SamzaObjectMapper.java | 16 ++++------------ .../org/apache/samza/serializers/CheckpointV1Serde.scala | 8 +------- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java index b317e9d98..3470fd855 100644 --- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java +++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java @@ -221,9 +221,7 @@ public class SamzaObjectMapper { public static class SystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> { @Override public void serialize(SystemStreamPartition ssp, JsonGenerator jgen, SerializerProvider provider) throws IOException { - String sspString = ssp.getSystem() + "." + ssp.getStream() + "." - + String.valueOf(ssp.getPartition().getPartitionId()) + "." - + String.valueOf(ssp.getKeyBucket()); + String sspString = ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId()); jgen.writeFieldName(sspString); } } @@ -232,16 +230,10 @@ public class SamzaObjectMapper { @Override public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException { String[] parts = sspString.split("\\."); - if (parts.length < 3 || parts.length > 4) { - throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition' " - + "or 'system.stream.partition.keyBucket"); + if (parts.length < 3) { + throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition' "); } - if (parts.length == 3) { - return new SystemStreamPartition( - new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2]))); - } - // else parts.length == 4 and the 4th part is the keyBucket - return new SystemStreamPartition(parts[0], parts[1], new Partition(Integer.parseInt(parts[2])), Integer.parseInt(parts[3])); + return new SystemStreamPartition(new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2]))); } } diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala index c817e2ffe..e69a97300 100644 --- a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala +++ b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala @@ -55,12 +55,7 @@ class CheckpointV1Serde extends Serde[CheckpointV1] with Logging { require(partition != null, "Partition must be present in JSON-encoded SystemStreamPartition") val offset = sspInfo.get("offset") // allow null offsets, e.g. for changelog ssps - var keyBucket = sspInfo.get("keyBucket") - if (keyBucket == null) { - keyBucket = "-1" - } - - new SystemStreamPartition(system, stream, new Partition(partition.toInt), keyBucket.toInt) -> offset + new SystemStreamPartition(system, stream, new Partition(partition.toInt)) -> offset } val cpMap = jMap.values.asScala.map(deserializeJSONMap).toMap @@ -83,7 +78,6 @@ class CheckpointV1Serde extends Serde[CheckpointV1] with Logging { jMap.put("system", ssp.getSystemStream.getSystem) jMap.put("stream", ssp.getSystemStream.getStream) jMap.put("partition", ssp.getPartition.getPartitionId.toString) - jMap.put("keyBucket", ssp.getKeyBucket.toString) jMap.put("offset", offset) asMap.put(ssp.toString, jMap)