This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 4156ea0 KAFKA-7316: Fix Streams Scala filter recursive call #5538
4156ea0 is described below
commit 4156ea0a9bcca67d209fd3b43d2268c9abd5a0b5
Author: Joan Goyeau <[email protected]>
AuthorDate: Thu Aug 23 17:15:27 2018 +0100
KAFKA-7316: Fix Streams Scala filter recursive call #5538
Due to lack of conversion to kstream Predicate, existing filter method in
KTable.scala would result in StackOverflowError.
This PR fixes the bug and adds testing for it.
Reviewers: Guozhang Wang <[email protected]>, John Roesler
<[email protected]>
---
.../kafka/streams/scala/kstream/KTable.scala | 4 +-
.../apache/kafka/streams/scala/KStreamTest.scala | 48 ++++++++++++++++
.../apache/kafka/streams/scala/KTableTest.scala | 66 ++++++++++++++++++++++
3 files changed, 116 insertions(+), 2 deletions(-)
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index a78d321..d41496f 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -47,7 +47,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filter`
*/
def filter(predicate: (K, V) => Boolean): KTable[K, V] =
- inner.filter(predicate(_, _))
+ inner.filter(predicate.asPredicate)
/**
* Create a new [[KTable]] that consists all records of this [[KTable]]
which satisfies the given
@@ -71,7 +71,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filterNot`
*/
def filterNot(predicate: (K, V) => Boolean): KTable[K, V] =
- inner.filterNot(predicate(_, _))
+ inner.filterNot(predicate.asPredicate)
/**
* Create a new [[KTable]] that consists all records of this [[KTable]]
which do <em>not</em> satisfy the given
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
index 6a302b2..2e2132d 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
@@ -29,6 +29,52 @@ import org.scalatest.{FlatSpec, Matchers}
@RunWith(classOf[JUnitRunner])
class KStreamTest extends FlatSpec with Matchers with TestDriver {
+ "filter a KStream" should "filter records satisfying the predicate" in {
+ val builder = new StreamsBuilder()
+ val sourceTopic = "source"
+ val sinkTopic = "sink"
+
+ builder.stream[String, String](sourceTopic).filter((_, value) => value !=
"value2").to(sinkTopic)
+
+ val testDriver = createTestDriver(builder)
+
+ testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+ testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
+
+ testDriver.pipeRecord(sourceTopic, ("2", "value2"))
+ testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+ testDriver.pipeRecord(sourceTopic, ("3", "value3"))
+ testDriver.readRecord[String, String](sinkTopic).value shouldBe "value3"
+
+ testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+ testDriver.close()
+ }
+
+ "filterNot a KStream" should "filter records not satisfying the predicate"
in {
+ val builder = new StreamsBuilder()
+ val sourceTopic = "source"
+ val sinkTopic = "sink"
+
+ builder.stream[String, String](sourceTopic).filterNot((_, value) => value
== "value2").to(sinkTopic)
+
+ val testDriver = createTestDriver(builder)
+
+ testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+ testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
+
+ testDriver.pipeRecord(sourceTopic, ("2", "value2"))
+ testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+ testDriver.pipeRecord(sourceTopic, ("3", "value3"))
+ testDriver.readRecord[String, String](sinkTopic).value shouldBe "value3"
+
+ testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+ testDriver.close()
+ }
+
"selectKey a KStream" should "select a new key" in {
val builder = new StreamsBuilder()
val sourceTopic = "source"
@@ -44,6 +90,8 @@ class KStreamTest extends FlatSpec with Matchers with
TestDriver {
testDriver.pipeRecord(sourceTopic, ("1", "value2"))
testDriver.readRecord[String, String](sinkTopic).key shouldBe "value2"
+ testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
testDriver.close()
}
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
index 8c88ff5..2e9c821 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
@@ -29,6 +29,72 @@ import org.scalatest.{FlatSpec, Matchers}
@RunWith(classOf[JUnitRunner])
class KTableTest extends FlatSpec with Matchers with TestDriver {
+ "filter a KTable" should "filter records satisfying the predicate" in {
+ val builder = new StreamsBuilder()
+ val sourceTopic = "source"
+ val sinkTopic = "sink"
+
+ val table = builder.stream[String, String](sourceTopic).groupBy((key, _)
=> key).count()
+ table.filter((_, value) => value > 1).toStream.to(sinkTopic)
+
+ val testDriver = createTestDriver(builder)
+
+ {
+ testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+ val record = testDriver.readRecord[String, Long](sinkTopic)
+ record.key shouldBe "1"
+ record.value shouldBe (null: java.lang.Long)
+ }
+ {
+ testDriver.pipeRecord(sourceTopic, ("1", "value2"))
+ val record = testDriver.readRecord[String, Long](sinkTopic)
+ record.key shouldBe "1"
+ record.value shouldBe 2
+ }
+ {
+ testDriver.pipeRecord(sourceTopic, ("2", "value1"))
+ val record = testDriver.readRecord[String, Long](sinkTopic)
+ record.key shouldBe "2"
+ record.value shouldBe (null: java.lang.Long)
+ }
+ testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+ testDriver.close()
+ }
+
+ "filterNot a KTable" should "filter records not satisfying the predicate" in
{
+ val builder = new StreamsBuilder()
+ val sourceTopic = "source"
+ val sinkTopic = "sink"
+
+ val table = builder.stream[String, String](sourceTopic).groupBy((key, _)
=> key).count()
+ table.filterNot((_, value) => value > 1).toStream.to(sinkTopic)
+
+ val testDriver = createTestDriver(builder)
+
+ {
+ testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+ val record = testDriver.readRecord[String, Long](sinkTopic)
+ record.key shouldBe "1"
+ record.value shouldBe 1
+ }
+ {
+ testDriver.pipeRecord(sourceTopic, ("1", "value2"))
+ val record = testDriver.readRecord[String, Long](sinkTopic)
+ record.key shouldBe "1"
+ record.value shouldBe (null: java.lang.Long)
+ }
+ {
+ testDriver.pipeRecord(sourceTopic, ("2", "value1"))
+ val record = testDriver.readRecord[String, Long](sinkTopic)
+ record.key shouldBe "2"
+ record.value shouldBe 1
+ }
+ testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+ testDriver.close()
+ }
+
"join 2 KTables" should "join correctly records" in {
val builder = new StreamsBuilder()
val sourceTopic1 = "source1"