This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 0b65dbf [Minor test fix] Enabled and fixed a
TestKafkaCheckpointManager test (#1382)
0b65dbf is described below
commit 0b65dbf0e25a030c249791a485f3b46d79cec64f
Author: Cameron Lee <[email protected]>
AuthorDate: Fri Jun 12 15:31:25 2020 -0700
[Minor test fix] Enabled and fixed a TestKafkaCheckpointManager test (#1382)
---
.../org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 2e7a7e4..7d6db64 100644
---
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -66,6 +66,7 @@ class TestKafkaCheckpointManager extends
KafkaServerTestHarness {
props.map(_root_.kafka.server.KafkaConfig.fromProps)
}
+ @Test
def testWriteCheckpointShouldRecreateSystemProducerOnFailure(): Unit = {
val checkpointTopic = "checkpoint-topic-2"
val mockKafkaProducer: SystemProducer =
Mockito.mock(classOf[SystemProducer])
@@ -82,7 +83,6 @@ class TestKafkaCheckpointManager extends
KafkaServerTestHarness {
val spec = new KafkaStreamSpec("id", checkpointTopic,
checkpointSystemName, 1, 1, props)
val checkPointManager = Mockito.spy(new KafkaCheckpointManager(spec, new
MockSystemFactory, false, config, new NoOpMetricsRegistry))
val newKafkaProducer: SystemProducer =
Mockito.mock(classOf[SystemProducer])
- checkPointManager.MaxRetryDurationInMillis = 1
Mockito.doReturn(newKafkaProducer).when(checkPointManager).getSystemProducer()