Hey Wayang people,

I worked on the ApachaSpark/ApacheKafka/ApacheWayang integration.

So far good progress.

My current blockers are related to Serialization of the Spark Tasks.

I solved many of such NotSerializableException by adding the Serializable
interface to many of the classes, involved in the processing. But I run
into this "Lambda" related problem - I assume this is where Scala land
begins.

I found two independent but comparable situations as described in this
email.

Any hint which helps me to solve those issues (probably of the same kind)
is welcome.

Cheers,
Mirko



*Observations:*

*(1)* the Formatter UDF

import java.io.Serializable;

public class Util implements Serializable {
    public static String formatData( String f1, Integer f2 ) {
        return String.format("%d, %s", f1, f2);
    }
}

and in my WayangPlan:

d -> Util.formatData( d.getField0(), d.getField1() )

causes such a NotSerializableException:

Caused by: java.io.NotSerializableException:
KafkaTopicWordCountSpark$$Lambda$503/0x00000008005ea040
*        - field (class "org.apache.wayang.api.package$$anon$1", name:
"scalaFunc$1", type: "interface scala.Function1")*
        - object (class "org.apache.wayang.api.package$$anon$1",
org.apache.wayang.api.package$$anon$1@5aede88b)
        - field (class
"org.apache.wayang.core.function.TransformationDescriptor", name:
"javaImplementation", type: "interface
org.apache.wayang.core.function.FunctionDescriptor$SerializableFunction")
        - object (class
"org.apache.wayang.core.function.TransformationDescriptor",
TransformationDescriptor[org.apache.wayang.api.package$$anon$1@5aede88b])
        - field (class "org.apache.wayang.basic.operators.KafkaTopicSink",
name: "formattingDescriptor", type: "class
org.apache.wayang.core.function.TransformationDescriptor")
        - object (class "org.apache.wayang.basic.operators.KafkaTopicSink",
KafkaTopicSink[Write to KafkaTopic test_23456])




*(2) *by using null as formatterUDF I would expect a NullPointer Exception,
later after the job started, but instead I get another
NotSerializableException from DefaultCardinalityEstimator:


Caused by: java.io.NotSerializableException:
*org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator$$Lambda*
$502/0x00000008005e9440
        - field (class
"org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator",
name: "singlePointEstimator", type: "interface
java.util.function.ToLongBiFunction")
        - object (class
"org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator",
org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator@285b63c2
)
        - element of array (index: 0)

-- 

Dr. rer. nat. Mirko Kämpf
Müchelner Str. 23
06259 Frankleben

Reply via email to