This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a9d7f8a MINOR: Fix Streams scala format violations (#5472)
a9d7f8a is described below
commit a9d7f8a1fdfa129105c6b3a128a91524874071ef
Author: Manikumar Reddy O <[email protected]>
AuthorDate: Wed Aug 8 01:20:42 2018 +0530
MINOR: Fix Streams scala format violations (#5472)
Reviewers: Guozhang Wang <[email protected]>
---
.../kafka/streams/scala/kstream/KStream.scala | 1 -
.../apache/kafka/streams/scala/TopologyTest.scala | 30 ++++++++++++----------
2 files changed, 17 insertions(+), 14 deletions(-)
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index c02939a..a8766bd 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -299,7 +299,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
stateStoreNames: String*): KStream[K1, V1] =
inner.transform(transformerSupplier.asTransformerSupplier,
stateStoreNames: _*)
-
/**
* Transform the value of each input record into a new value (with possible
new type) of the output record.
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`)
is applied to each input
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 194abf5..8a0eabb 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -209,12 +209,15 @@ class TopologyTest extends JUnitSuite {
val _: KTable[String, Long] =
textLines
- .transform(() => new Transformer[String, String, KeyValue[String,
String]] {
- override def init(context: ProcessorContext): Unit = Unit
- override def transform(key: String, value: String):
KeyValue[String, String] =
- new KeyValue(key, value.toLowerCase)
- override def close(): Unit = Unit
- })
+ .transform(
+ () =>
+ new Transformer[String, String, KeyValue[String, String]] {
+ override def init(context: ProcessorContext): Unit = Unit
+ override def transform(key: String, value: String):
KeyValue[String, String] =
+ new KeyValue(key, value.toLowerCase)
+ override def close(): Unit = Unit
+ }
+ )
.groupBy((k, v) => v)
.count()
@@ -229,15 +232,16 @@ class TopologyTest extends JUnitSuite {
val lowered: KStreamJ[String, String] = textLines
.transform(new TransformerSupplier[String, String, KeyValue[String,
String]] {
- override def get(): Transformer[String, String, KeyValue[String,
String]] = new Transformer[String, String, KeyValue[String, String]] {
- override def init(context: ProcessorContext): Unit = Unit
+ override def get(): Transformer[String, String, KeyValue[String,
String]] =
+ new Transformer[String, String, KeyValue[String, String]] {
+ override def init(context: ProcessorContext): Unit = Unit
- override def transform(key: String, value: String): KeyValue[String,
String] =
- new KeyValue(key, value.toLowerCase)
+ override def transform(key: String, value: String):
KeyValue[String, String] =
+ new KeyValue(key, value.toLowerCase)
- override def close(): Unit = Unit
- }
- })
+ override def close(): Unit = Unit
+ }
+ })
val grouped: KGroupedStreamJ[String, String] = lowered.groupBy {
new KeyValueMapper[String, String, String] {