This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 295119349 remove keyBucket from checkpoint serde to allow for simpler
rollback (#1625)
295119349 is described below
commit 29511934943d1501f369f49ad1b9bd78d66febe1
Author: lakshmi-manasa-g <[email protected]>
AuthorDate: Tue Aug 2 16:19:21 2022 -0700
remove keyBucket from checkpoint serde to allow for simpler rollback (#1625)
Symptom: Rolling back to versions which dont accept 4 parts in checkpoint
serde can throw NPE exceptions.
Cause: As part of elasticity, #1608 introduced keyBucket into checkpoint
serde.
Fix: Remove keyBucket from checkpoint serde - aka do not add it when
serializing checkpoint.
Backwards Compatible: yes.
Though checkpoints written after #1608 and before this PR will have 4 part
SSP in checkpoint, serde, they can still be read by code in this PR and vice
versa.
elasticity will not work completely once this pr is merged.
---
.../samza/serializers/model/SamzaObjectMapper.java | 16 ++++------------
.../org/apache/samza/serializers/CheckpointV1Serde.scala | 8 +-------
2 files changed, 5 insertions(+), 19 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index b317e9d98..3470fd855 100644
---
a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++
b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -221,9 +221,7 @@ public class SamzaObjectMapper {
public static class SystemStreamPartitionKeySerializer extends
JsonSerializer<SystemStreamPartition> {
@Override
public void serialize(SystemStreamPartition ssp, JsonGenerator jgen,
SerializerProvider provider) throws IOException {
- String sspString = ssp.getSystem() + "." + ssp.getStream() + "."
- + String.valueOf(ssp.getPartition().getPartitionId()) + "."
- + String.valueOf(ssp.getKeyBucket());
+ String sspString = ssp.getSystem() + "." + ssp.getStream() + "." +
String.valueOf(ssp.getPartition().getPartitionId());
jgen.writeFieldName(sspString);
}
}
@@ -232,16 +230,10 @@ public class SamzaObjectMapper {
@Override
public Object deserializeKey(String sspString, DeserializationContext
ctxt) throws IOException {
String[] parts = sspString.split("\\.");
- if (parts.length < 3 || parts.length > 4) {
- throw new IllegalArgumentException("System stream partition expected
in format 'system.stream.partition' "
- + "or 'system.stream.partition.keyBucket");
+ if (parts.length < 3) {
+ throw new IllegalArgumentException("System stream partition expected
in format 'system.stream.partition' ");
}
- if (parts.length == 3) {
- return new SystemStreamPartition(
- new SystemStream(parts[0], parts[1]), new
Partition(Integer.parseInt(parts[2])));
- }
- // else parts.length == 4 and the 4th part is the keyBucket
- return new SystemStreamPartition(parts[0], parts[1], new
Partition(Integer.parseInt(parts[2])), Integer.parseInt(parts[3]));
+ return new SystemStreamPartition(new SystemStream(parts[0], parts[1]),
new Partition(Integer.parseInt(parts[2])));
}
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
index c817e2ffe..e69a97300 100644
---
a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
+++
b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
@@ -55,12 +55,7 @@ class CheckpointV1Serde extends Serde[CheckpointV1] with
Logging {
require(partition != null, "Partition must be present in JSON-encoded
SystemStreamPartition")
val offset = sspInfo.get("offset")
// allow null offsets, e.g. for changelog ssps
- var keyBucket = sspInfo.get("keyBucket")
- if (keyBucket == null) {
- keyBucket = "-1"
- }
-
- new SystemStreamPartition(system, stream, new
Partition(partition.toInt), keyBucket.toInt) -> offset
+ new SystemStreamPartition(system, stream, new
Partition(partition.toInt)) -> offset
}
val cpMap = jMap.values.asScala.map(deserializeJSONMap).toMap
@@ -83,7 +78,6 @@ class CheckpointV1Serde extends Serde[CheckpointV1] with
Logging {
jMap.put("system", ssp.getSystemStream.getSystem)
jMap.put("stream", ssp.getSystemStream.getStream)
jMap.put("partition", ssp.getPartition.getPartitionId.toString)
- jMap.put("keyBucket", ssp.getKeyBucket.toString)
jMap.put("offset", offset)
asMap.put(ssp.toString, jMap)