Hi @vvcephei ,
I tried this approach with two implicit conversions, and it works and it can be
done with the code that looks roughly like this:
`
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{Transformer, TransformerSupplier}
import org.apache.kafka.streams.processor.ProcessorContext
import scala.language.implicitConversions
object TransformerImplicits {
implicit def scalaTransformerWrapper[K, V, K1, V1](
scalaTransformer: Transformer[K, V, (K1, V1)]
): Transformer[K, V, KeyValue[K1, V1]] = {
val transformer = scalaTransformer
new Transformer[K, V, KeyValue[K1, V1]] {
override def init(context: ProcessorContext): Unit =
transformer.init(context)
override def transform(key: K, value: V): KeyValue[K1, V1] = {
transformer.transform(key, value) match {
case (k1, v1) => KeyValue.pair(k1, v1)
case _ => null
}
}
override def close(): Unit = transformer.close()
}
}
implicit def supplierWrapper[K, V, K1, V1](
supplier: () => Transformer[K, V, KeyValue[K1, V1]]
): TransformerSupplier[K, V, KeyValue[K1, V1]] = {
new TransformerSupplier[K, V, KeyValue[K1, V1]] {
override def get(): Transformer[K, V, KeyValue[K1, V1]] = supplier()
}
}
} `
But I really feel that this code feels a bit alien in the current
kafka-streams-scala. What I mean is that other scala.kstream.KStreamm methods
accept native scala functions and convert them internally into Java objects
compatible with java's KStream methods. Take for example KStream.filter():
filter(predicate: (K, V) => Boolean) it accepts a simple Scala function and
only internally converts it into a Java compatible Predicate. This way a user
is spared from the necessity of importing extra implicit conversions into the
scope and kinda follows "Principle of Least Power".
If anything I would say is that it's the current
(https://github.com/apache/kafka/blob/f123d2f18c55b1cf2edb452aeb87e6ad0743c292/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala)
KStream.transform() doesn't really belong there, as it's the only method (that
I can see) that uses Java API directly, simply forwarding it to an underlyling
Java KStream object without adapting it to Scala.
Furthermore, this transform() that this pull requests restores (def
transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1,
V1]], stateStoreNames: String*)) comes directly from the original Lightbend's
KStreamS.transform() (and it looks to me that this is where the first version
of Apache's kafka-streams-scala came from). Even the original version scala's
KStream.transform() in 2.0.0 version tried to make scala-friendly API
available, though it didn't accept the function, but the object itself, which
lead us to these problems, but intention was definitely there.
In the end it looks to me that:
- providing a pure Java API on the scala wrapper doesn't really mesh well with
the rest of kafka-streams-scala API
- having a user bring more implicit conversion into the scope (which already
have implicit Serdes/Produced/Consumed) would place unnecesary burden on the
user, and also wouldn't follow the pattern of other existing APIs
- doesn't follow the intention of having a scala-friendly wrapper over a Java
API
I don't really know if the existing pure Java .transform() should be kept in
the API. On one hand it's probably nice for some extra performance, as there's
no need to unpack each tuple and turn it into a KeyValue instance, but on the
other hand no other KStream methods have their pure Java API available directly.
Another point is that there's no need to convert () => Transformer in Scala
2.12, as Scala 2.12 directly supports SAMs. But again I don't really know how
important support for Scala 2.11 is.
Please let me know what you think. As I'd like to first come to some conclusion
about what kind of code would be preferable for KStream.transform() before
actually making further changes and putting any progressing to KIP.
[ Full content available at: https://github.com/apache/kafka/pull/5619 ]
This message was relayed via gitbox.apache.org for [email protected]