This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 08931ce KafkaChangelogKeySerde should support deserializing multiple
keys (#1502)
08931ce is described below
commit 08931cedea99f35546f4b76923bfe6840bf2144a
Author: Daniel Chen <[email protected]>
AuthorDate: Wed May 19 15:47:11 2021 -0700
KafkaChangelogKeySerde should support deserializing multiple keys (#1502)
---
.../checkpoint/kafka/KafkaCheckpointLogKey.java | 3 -
.../kafka/KafkaCheckpointLogKeySerde.java | 4 --
.../checkpoint/kafka/KafkaCheckpointManager.scala | 33 ++++++-----
.../kafka/TestKafkaCheckpointLogKeySerde.java | 12 ++++
.../kafka/TestKafkaCheckpointManager.scala | 68 ++++++++++++++++++++--
5 files changed, 95 insertions(+), 25 deletions(-)
diff --git
a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
index 05114f9..a732aba 100644
---
a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
+++
b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
@@ -48,9 +48,6 @@ public class KafkaCheckpointLogKey {
Preconditions.checkNotNull(type);
Preconditions.checkState(!grouperFactoryClassName.isEmpty(), "Empty
grouper factory class provided");
- Preconditions.checkState(type.equals(CHECKPOINT_KEY_TYPE),
String.format("Invalid type provided for checkpoint key. " +
- "Expected: (%s) Actual: (%s)", CHECKPOINT_KEY_TYPE, type));
-
this.grouperFactoryClassName = grouperFactoryClassName;
this.taskName = taskName;
this.type = type;
diff --git
a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
index b00f12e..e738190 100644
---
a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
+++
b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
@@ -59,10 +59,6 @@ public class KafkaCheckpointLogKeySerde implements
Serde<KafkaCheckpointLogKey>
try {
LinkedHashMap<String, String> deserializedKey = MAPPER.readValue(bytes,
LinkedHashMap.class);
- if
(!KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE.equals(deserializedKey.get(TYPE_FIELD)))
{
- throw new IllegalArgumentException(String.format("Invalid key
detected. Type of the key is %s", deserializedKey.get(TYPE_FIELD)));
- }
-
return new KafkaCheckpointLogKey(deserializedKey.get(TYPE_FIELD), new
TaskName(deserializedKey.get(TASK_NAME_FIELD)),
deserializedKey.get(SSP_GROUPER_FACTORY_FIELD)
);
} catch (Exception e) {
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 757e7ae..d88048e 100644
---
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -159,19 +159,7 @@ class KafkaCheckpointManager(checkpointSpec:
KafkaStreamSpec,
* @inheritdoc
*/
override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
- val key = new
KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE, taskName,
expectedGrouperFactory)
- val keyBytes = try {
- checkpointKeySerde.toBytes(key)
- } catch {
- case e: Exception => throw new SamzaException(s"Exception when writing
checkpoint-key for $taskName: $checkpoint", e)
- }
- val msgBytes = try {
- checkpointMsgSerde.toBytes(checkpoint)
- } catch {
- case e: Exception => throw new SamzaException(s"Exception when writing
checkpoint for $taskName: $checkpoint", e)
- }
-
- val envelope = new OutgoingMessageEnvelope(checkpointSsp, keyBytes,
msgBytes)
+ val envelope = buildOutgoingMessageEnvelope(taskName, checkpoint)
// Used for exponential backoff retries on failure in sending messages
through producer.
val startTimeInMillis: Long = System.currentTimeMillis()
@@ -188,7 +176,7 @@ class KafkaCheckpointManager(checkpointSpec:
KafkaStreamSpec,
} catch {
case exception: Exception => {
producerException = exception
- warn(s"Retrying failed checkpoint write to key: $key, checkpoint:
$checkpoint for task: $taskName", exception)
+ warn(s"Retrying failed checkpoint write for checkpoint: $checkpoint
for task: $taskName", exception)
// TODO: Remove this producer recreation logic after SAMZA-1393.
val newProducer: SystemProducer = getSystemProducer()
producerCreationLock.synchronized {
@@ -333,4 +321,21 @@ class KafkaCheckpointManager(checkpointSpec:
KafkaStreamSpec,
partitionMetaData.getOldestOffset
}
+
+ @VisibleForTesting
+ def buildOutgoingMessageEnvelope(taskName: TaskName, checkpoint:
Checkpoint): OutgoingMessageEnvelope = {
+ val key = new
KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE, taskName,
expectedGrouperFactory)
+ val keyBytes = try {
+ checkpointKeySerde.toBytes(key)
+ } catch {
+ case e: Exception => throw new SamzaException(s"Exception when writing
checkpoint-key for $taskName: $checkpoint", e)
+ }
+ val msgBytes = try {
+ checkpointMsgSerde.toBytes(checkpoint)
+ } catch {
+ case e: Exception => throw new SamzaException(s"Exception when writing
checkpoint for $taskName: $checkpoint", e)
+ }
+
+ new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
+ }
}
diff --git
a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
index b648b1c..614aaba 100644
---
a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
+++
b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
@@ -50,4 +50,16 @@ public class TestKafkaCheckpointLogKeySerde {
// test that deserialize(serialize(k)) == k
Assert.assertEquals(key,
checkpointSerde.fromBytes(checkpointSerde.toBytes(key)));
}
+
+ @Test
+ public void testForwardsCompatibility() {
+ // Set the key to another value, this is for the future if we want to
support multiple checkpoint keys
+ // we do not want to throw in the Serdes layer, but must be validated in
the CheckpointManager
+ KafkaCheckpointLogKey key = new KafkaCheckpointLogKey("checkpoint-v2",
+ new TaskName("Partition 0"),
GroupByPartitionFactory.class.getCanonicalName());
+ KafkaCheckpointLogKeySerde checkpointSerde = new
KafkaCheckpointLogKeySerde();
+
+ // test that deserialize(serialize(k)) == k
+ Assert.assertEquals(key,
checkpointSerde.fromBytes(checkpointSerde.toBytes(key)));
+ }
}
diff --git
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 7d6db64..2671067 100644
---
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -127,6 +127,43 @@ class TestKafkaCheckpointManager extends
KafkaServerTestHarness {
}
@Test
+ def testReadCheckpointShouldIgnoreUnknownCheckpointKeys(): Unit = {
+ val checkpointTopic = "checkpoint-topic-1"
+ val kcm1 = createKafkaCheckpointManager(checkpointTopic)
+ kcm1.register(taskName)
+ kcm1.createResources
+ kcm1.start
+ kcm1.stop
+
+ // check that start actually creates the topic with log compaction enabled
+ val topicConfig =
adminZkClient.getAllTopicConfigs().getOrElse(checkpointTopic, new Properties())
+
+ assertEquals(topicConfig, new
KafkaConfig(config).getCheckpointTopicProperties())
+ assertEquals("compact", topicConfig.get("cleanup.policy"))
+ assertEquals("26214400", topicConfig.get("segment.bytes"))
+
+ // read before topic exists should result in a null checkpoint
+ val readCp = readCheckpoint(checkpointTopic, taskName)
+ assertNull(readCp)
+
+ // skips unknown checkpoints from checkpoint topic
+ writeCheckpoint(checkpointTopic, taskName, checkpoint1, "checkpoint-v2")
+ assertNull(readCheckpoint(checkpointTopic, taskName))
+
+ // reads latest v1 checkpoints
+ writeCheckpoint(checkpointTopic, taskName, checkpoint1)
+ assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName))
+
+ // writing checkpoint v2 still returns the previous v1 checkpoint
+ writeCheckpoint(checkpointTopic, taskName, checkpoint2, "checkpoint-v2")
+ assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName))
+
+ // writing checkpoint2 with the correct key returns the checkpoint2
+ writeCheckpoint(checkpointTopic, taskName, checkpoint2)
+ assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName))
+ }
+
+ @Test
def testWriteCheckpointShouldRetryFiniteTimesOnFailure(): Unit = {
val checkpointTopic = "checkpoint-topic-2"
val mockKafkaProducer: SystemProducer =
Mockito.mock(classOf[SystemProducer])
@@ -251,7 +288,8 @@ class TestKafkaCheckpointManager extends
KafkaServerTestHarness {
.build())
}
- private def createKafkaCheckpointManager(cpTopic: String, serde:
CheckpointSerde = new CheckpointSerde, failOnTopicValidation: Boolean = true) =
{
+ private def createKafkaCheckpointManager(cpTopic: String, serde:
CheckpointSerde = new CheckpointSerde,
+ failOnTopicValidation: Boolean = true, checkpointKey: String =
KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE) = {
val kafkaConfig = new org.apache.samza.config.KafkaConfig(config)
val props = kafkaConfig.getCheckpointTopicProperties()
val systemName = kafkaConfig.getCheckpointSystem.getOrElse(
@@ -264,7 +302,7 @@ class TestKafkaCheckpointManager extends
KafkaServerTestHarness {
val systemFactory = ReflectionUtil.getObj(systemFactoryClassName,
classOf[SystemFactory])
val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1,
props)
- new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation,
config, new NoOpMetricsRegistry, serde)
+ new MockKafkaCheckpointManager(spec, systemFactory, failOnTopicValidation,
serde, checkpointKey)
}
private def readCheckpoint(checkpointTopic: String, taskName: TaskName) :
Checkpoint = {
@@ -276,8 +314,9 @@ class TestKafkaCheckpointManager extends
KafkaServerTestHarness {
checkpoint
}
- private def writeCheckpoint(checkpointTopic: String, taskName: TaskName,
checkpoint: Checkpoint): Unit = {
- val kcm = createKafkaCheckpointManager(checkpointTopic)
+ private def writeCheckpoint(checkpointTopic: String, taskName: TaskName,
checkpoint: Checkpoint,
+ checkpointKey: String = KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE): Unit =
{
+ val kcm = createKafkaCheckpointManager(checkpointTopic, checkpointKey =
checkpointKey)
kcm.register(taskName)
kcm.start
kcm.writeCheckpoint(taskName, checkpoint)
@@ -300,4 +339,25 @@ class TestKafkaCheckpointManager extends
KafkaServerTestHarness {
}
}
+ class MockKafkaCheckpointManager(spec: KafkaStreamSpec, systemFactory:
SystemFactory, failOnTopicValidation: Boolean,
+ serde: CheckpointSerde = new CheckpointSerde, checkpointKey: String)
+ extends KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation,
config,
+ new NoOpMetricsRegistry, serde) {
+ override def buildOutgoingMessageEnvelope(taskName: TaskName, checkpoint:
Checkpoint): OutgoingMessageEnvelope = {
+ val key = new KafkaCheckpointLogKey(checkpointKey, taskName,
expectedGrouperFactory)
+ val keySerde = new KafkaCheckpointLogKeySerde
+ val checkpointMsgSerde = new CheckpointSerde
+ val keyBytes = try {
+ keySerde.toBytes(key)
+ } catch {
+ case e: Exception => throw new SamzaException(s"Exception when writing
checkpoint-key for $taskName: $checkpoint", e)
+ }
+ val msgBytes = try {
+ checkpointMsgSerde.toBytes(checkpoint)
+ } catch {
+ case e: Exception => throw new SamzaException(s"Exception when writing
checkpoint for $taskName: $checkpoint", e)
+ }
+ new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
+ }
+ }
}