Hey Wayang people, I worked on the ApachaSpark/ApacheKafka/ApacheWayang integration.
So far good progress. My current blockers are related to Serialization of the Spark Tasks. I solved many of such NotSerializableException by adding the Serializable interface to many of the classes, involved in the processing. But I run into this "Lambda" related problem - I assume this is where Scala land begins. I found two independent but comparable situations as described in this email. Any hint which helps me to solve those issues (probably of the same kind) is welcome. Cheers, Mirko *Observations:* *(1)* the Formatter UDF import java.io.Serializable; public class Util implements Serializable { public static String formatData( String f1, Integer f2 ) { return String.format("%d, %s", f1, f2); } } and in my WayangPlan: d -> Util.formatData( d.getField0(), d.getField1() ) causes such a NotSerializableException: Caused by: java.io.NotSerializableException: KafkaTopicWordCountSpark$$Lambda$503/0x00000008005ea040 * - field (class "org.apache.wayang.api.package$$anon$1", name: "scalaFunc$1", type: "interface scala.Function1")* - object (class "org.apache.wayang.api.package$$anon$1", org.apache.wayang.api.package$$anon$1@5aede88b) - field (class "org.apache.wayang.core.function.TransformationDescriptor", name: "javaImplementation", type: "interface org.apache.wayang.core.function.FunctionDescriptor$SerializableFunction") - object (class "org.apache.wayang.core.function.TransformationDescriptor", TransformationDescriptor[org.apache.wayang.api.package$$anon$1@5aede88b]) - field (class "org.apache.wayang.basic.operators.KafkaTopicSink", name: "formattingDescriptor", type: "class org.apache.wayang.core.function.TransformationDescriptor") - object (class "org.apache.wayang.basic.operators.KafkaTopicSink", KafkaTopicSink[Write to KafkaTopic test_23456]) *(2) *by using null as formatterUDF I would expect a NullPointer Exception, later after the job started, but instead I get another NotSerializableException from DefaultCardinalityEstimator: Caused by: java.io.NotSerializableException: *org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator$$Lambda* $502/0x00000008005e9440 - field (class "org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator", name: "singlePointEstimator", type: "interface java.util.function.ToLongBiFunction") - object (class "org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator", org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator@285b63c2 ) - element of array (index: 0) -- Dr. rer. nat. Mirko Kämpf Müchelner Str. 23 06259 Frankleben