Hi @vvcephei ,

I tried this approach with two implicit conversions, and it works and it can be 
done with the code that looks roughly like this:

`
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{Transformer, TransformerSupplier}
import org.apache.kafka.streams.processor.ProcessorContext

import scala.language.implicitConversions

object TransformerImplicits {

  implicit def scalaTransformerWrapper[K, V, K1, V1](
    scalaTransformer: Transformer[K, V, (K1, V1)]
  ): Transformer[K, V, KeyValue[K1, V1]] = {
    val transformer = scalaTransformer
    new Transformer[K, V, KeyValue[K1, V1]] {
      override def init(context: ProcessorContext): Unit = 
transformer.init(context)
      override def transform(key: K, value: V): KeyValue[K1, V1] = {
        transformer.transform(key, value) match {
          case (k1, v1) => KeyValue.pair(k1, v1)
          case _ => null
        }
      }
      override def close(): Unit = transformer.close()
    }
  }

  implicit def supplierWrapper[K, V, K1, V1](
    supplier: () => Transformer[K, V, KeyValue[K1, V1]]
  ): TransformerSupplier[K, V, KeyValue[K1, V1]] = {
    new TransformerSupplier[K, V, KeyValue[K1, V1]] {
      override def get(): Transformer[K, V, KeyValue[K1, V1]] = supplier()
    }
  }
} `

But I really feel that this code feels a bit alien in the current 
kafka-streams-scala. What I mean is that other scala.kstream.KStreamm methods 
accept native scala functions and convert them internally into Java objects 
compatible with java's KStream methods. Take for example KStream.filter(): 
filter(predicate: (K, V) => Boolean) it accepts a simple Scala function and 
only internally converts it into a Java compatible Predicate. This way a user 
is spared from the necessity of importing extra implicit conversions into the 
scope and kinda follows "Principle of Least Power".

If anything I would say is that it's the current 
(https://github.com/apache/kafka/blob/f123d2f18c55b1cf2edb452aeb87e6ad0743c292/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala)
 KStream.transform() doesn't really belong there, as it's the only method (that 
I can see) that uses Java API directly, simply forwarding it to an underlyling 
Java KStream object without adapting it to Scala.

Furthermore, this transform() that this pull requests restores (def 
transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, 
V1]], stateStoreNames: String*)) comes directly from the original Lightbend's 
KStreamS.transform() (and it looks to me that this is where the first version 
of Apache's kafka-streams-scala came from). Even the original version scala's 
KStream.transform() in 2.0.0 version tried to make scala-friendly API 
available, though it didn't accept the function, but the object itself, which 
lead us to these problems, but intention was definitely there.

In the end it looks to me that:

- providing a pure Java API on the scala wrapper doesn't really mesh well with 
the rest of kafka-streams-scala API
- having a user bring more implicit conversion into the scope (which already 
have implicit Serdes/Produced/Consumed) would place unnecesary burden on the 
user, and also wouldn't follow the pattern of other existing APIs
- doesn't follow the intention of having a scala-friendly wrapper over a Java 
API

I don't really know if the existing pure Java .transform() should be kept in 
the API. On one hand it's probably nice for some extra performance, as there's 
no need to unpack each tuple and turn it into a KeyValue instance, but on the 
other hand no other KStream methods have their pure Java API available directly.

Another point is that there's no need to convert () => Transformer in Scala 
2.12, as Scala 2.12 directly supports SAMs. But again I don't really know how 
important support for Scala 2.11 is.

Please let me know what you think. As I'd like to first come to some conclusion 
about what kind of code would be preferable for KStream.transform() before 
actually making further changes and putting any progressing to KIP.

[ Full content available at: https://github.com/apache/kafka/pull/5619 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to