Hey Mirko,
to me it looks like you defined these inputs as you need them for formatting 
the data you write (as in a UDF, a lambda or similar) and in order to 
instantiate a LoadProfileEstimator (see 
https://github.com/apache/incubator-wayang/blob/ba23d214d93025898e5946b44438abb9e8d1fd32/wayang-benchmark/src/main/scala/org/apache/wayang/apps/simwords/Word2NVec.scala#L79
 on how to do this from a config).
In your code 
(https://github.com/apache/incubator-wayang/blob/cd7b6ea94d12c733cd23780f0b290178e44aab0e/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/DataQuanta.scala#L785)
 you define the LoadProfileEstimator as optional.
Have you tried running the .writeKafkaTopic with a lambda that passes the input 
as is?

Best,
Juri
________________________________
From: Mirko Kämpf <mirko.kae...@gmail.com>
Sent: 12 February 2024 09:37
To: dev@wayang.apache.org <dev@wayang.apache.org>
Subject: Next steps towards KafkaTopicSink

[You don't often get email from mirko.kae...@gmail.com. Learn why this is 
important at https://aka.ms/LearnAboutSenderIdentification ]

Hey Wayang team,

I have added the KafkaTopicSink incl. Mappings and the code compiles
already, which is good for a Monday morning session ;-)

Here is my question:

In the example code which I have taken out of my WordCount Example, I need
FORMATTER_UDF and UDF_LOAD_PROFILE_ESTIMATOR in order to
call the new writeKafkaTopic(...) function.

Can somebody please provide me with a pointer to a place in the code or
docs, where I can read how to instantiate those objects?

Many thanks,
and have a creat start into the week.

Cheers,
Mirk

Object FORMATTER_UDF = null;
Object UDF_LOAD_PROFILE_ESTIMATOR = null;

// Get a plan builder.
WayangContext wayangContext = new WayangContext(new Configuration())
        .withPlugin(Java.basicPlugin());
//        .withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
        .withJobName(String.format("WordCount (%s)", topicName))
        .withUdfJarOf(KafkaTopicWordCount.class);

// Start building the WayangPlan.
Collection<Tuple2<String, Integer>> wordcounts = planBuilder
        // Read the text file.
        .readKafkaTopic(topicName).withName("Load data from topic")

        // Split each line by non-word characters.
        .flatMap(line -> Arrays.asList(line.split("\\W+")))
        .withSelectivity(10, 100, 0.9)
        .withName("Split words")

        // Filter empty tokens.
        .filter(token -> !token.isEmpty())
        .withSelectivity(0.99, 0.99, 0.99)
        .withName("Filter empty words")

        // Attach counter to each word.
        .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To
lower case, add counter")

        // Sum up counters for every word.
        .reduceByKey(
                Tuple2::getField0,
                (t1, t2) -> new Tuple2<>(t1.getField0(),
t1.getField1() + t2.getField1())
        )
        .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9,
1, false, in -> Math.round(0.01 * in[0])))
        .withName("Add counters")

        // Execute the plan and collect the results.
        //.collect();
        .writeKafkaTopic("test", FORMATTER_UDF, "job_test_1",
UDF_LOAD_PROFILE_ESTIMATOR );


--

Dr. rer. nat. Mirko Kämpf
Müchelner Str. 23
06259 Frankleben

Reply via email to