Hi Juri,

many thanks - your suggestion helped a lot.
I was able to go forward.

The framework for adding a KafkaTopicSink is ready.
My end-2-end test works and I can go on an bring in the KafkaClient to
finish this task,

Cheers,
Mirko



Am Mo., 12. Feb. 2024 um 10:42 Uhr schrieb Juri Petersen
<j...@itu.dk.invalid>:

> Hey Mirko,
> to me it looks like you defined these inputs as you need them for
> formatting the data you write (as in a UDF, a lambda or similar) and in
> order to instantiate a LoadProfileEstimator (see
> https://github.com/apache/incubator-wayang/blob/ba23d214d93025898e5946b44438abb9e8d1fd32/wayang-benchmark/src/main/scala/org/apache/wayang/apps/simwords/Word2NVec.scala#L79
> on how to do this from a config).
> In your code (
> https://github.com/apache/incubator-wayang/blob/cd7b6ea94d12c733cd23780f0b290178e44aab0e/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/DataQuanta.scala#L785)
> you define the LoadProfileEstimator as optional.
> Have you tried running the .writeKafkaTopic with a lambda that passes the
> input as is?
>
> Best,
> Juri
> ________________________________
> From: Mirko Kämpf <mirko.kae...@gmail.com>
> Sent: 12 February 2024 09:37
> To: dev@wayang.apache.org <dev@wayang.apache.org>
> Subject: Next steps towards KafkaTopicSink
>
> [You don't often get email from mirko.kae...@gmail.com. Learn why this is
> important at https://aka.ms/LearnAboutSenderIdentification ]
>
> Hey Wayang team,
>
> I have added the KafkaTopicSink incl. Mappings and the code compiles
> already, which is good for a Monday morning session ;-)
>
> Here is my question:
>
> In the example code which I have taken out of my WordCount Example, I need
> FORMATTER_UDF and UDF_LOAD_PROFILE_ESTIMATOR in order to
> call the new writeKafkaTopic(...) function.
>
> Can somebody please provide me with a pointer to a place in the code or
> docs, where I can read how to instantiate those objects?
>
> Many thanks,
> and have a creat start into the week.
>
> Cheers,
> Mirk
>
> Object FORMATTER_UDF = null;
> Object UDF_LOAD_PROFILE_ESTIMATOR = null;
>
> // Get a plan builder.
> WayangContext wayangContext = new WayangContext(new Configuration())
>         .withPlugin(Java.basicPlugin());
> //        .withPlugin(Spark.basicPlugin());
> JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
>         .withJobName(String.format("WordCount (%s)", topicName))
>         .withUdfJarOf(KafkaTopicWordCount.class);
>
> // Start building the WayangPlan.
> Collection<Tuple2<String, Integer>> wordcounts = planBuilder
>         // Read the text file.
>         .readKafkaTopic(topicName).withName("Load data from topic")
>
>         // Split each line by non-word characters.
>         .flatMap(line -> Arrays.asList(line.split("\\W+")))
>         .withSelectivity(10, 100, 0.9)
>         .withName("Split words")
>
>         // Filter empty tokens.
>         .filter(token -> !token.isEmpty())
>         .withSelectivity(0.99, 0.99, 0.99)
>         .withName("Filter empty words")
>
>         // Attach counter to each word.
>         .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To
> lower case, add counter")
>
>         // Sum up counters for every word.
>         .reduceByKey(
>                 Tuple2::getField0,
>                 (t1, t2) -> new Tuple2<>(t1.getField0(),
> t1.getField1() + t2.getField1())
>         )
>         .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9,
> 1, false, in -> Math.round(0.01 * in[0])))
>         .withName("Add counters")
>
>         // Execute the plan and collect the results.
>         //.collect();
>         .writeKafkaTopic("test", FORMATTER_UDF, "job_test_1",
> UDF_LOAD_PROFILE_ESTIMATOR );
>
>
> --
>
> Dr. rer. nat. Mirko Kämpf
> Müchelner Str. 23
> 06259 Frankleben
>


-- 

Dr. rer. nat. Mirko Kämpf
Müchelner Str. 23
06259 Frankleben

Reply via email to