Hi Juri, many thanks - your suggestion helped a lot. I was able to go forward.
The framework for adding a KafkaTopicSink is ready. My end-2-end test works and I can go on an bring in the KafkaClient to finish this task, Cheers, Mirko Am Mo., 12. Feb. 2024 um 10:42 Uhr schrieb Juri Petersen <j...@itu.dk.invalid>: > Hey Mirko, > to me it looks like you defined these inputs as you need them for > formatting the data you write (as in a UDF, a lambda or similar) and in > order to instantiate a LoadProfileEstimator (see > https://github.com/apache/incubator-wayang/blob/ba23d214d93025898e5946b44438abb9e8d1fd32/wayang-benchmark/src/main/scala/org/apache/wayang/apps/simwords/Word2NVec.scala#L79 > on how to do this from a config). > In your code ( > https://github.com/apache/incubator-wayang/blob/cd7b6ea94d12c733cd23780f0b290178e44aab0e/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/DataQuanta.scala#L785) > you define the LoadProfileEstimator as optional. > Have you tried running the .writeKafkaTopic with a lambda that passes the > input as is? > > Best, > Juri > ________________________________ > From: Mirko Kämpf <mirko.kae...@gmail.com> > Sent: 12 February 2024 09:37 > To: dev@wayang.apache.org <dev@wayang.apache.org> > Subject: Next steps towards KafkaTopicSink > > [You don't often get email from mirko.kae...@gmail.com. Learn why this is > important at https://aka.ms/LearnAboutSenderIdentification ] > > Hey Wayang team, > > I have added the KafkaTopicSink incl. Mappings and the code compiles > already, which is good for a Monday morning session ;-) > > Here is my question: > > In the example code which I have taken out of my WordCount Example, I need > FORMATTER_UDF and UDF_LOAD_PROFILE_ESTIMATOR in order to > call the new writeKafkaTopic(...) function. > > Can somebody please provide me with a pointer to a place in the code or > docs, where I can read how to instantiate those objects? > > Many thanks, > and have a creat start into the week. > > Cheers, > Mirk > > Object FORMATTER_UDF = null; > Object UDF_LOAD_PROFILE_ESTIMATOR = null; > > // Get a plan builder. > WayangContext wayangContext = new WayangContext(new Configuration()) > .withPlugin(Java.basicPlugin()); > // .withPlugin(Spark.basicPlugin()); > JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext) > .withJobName(String.format("WordCount (%s)", topicName)) > .withUdfJarOf(KafkaTopicWordCount.class); > > // Start building the WayangPlan. > Collection<Tuple2<String, Integer>> wordcounts = planBuilder > // Read the text file. > .readKafkaTopic(topicName).withName("Load data from topic") > > // Split each line by non-word characters. > .flatMap(line -> Arrays.asList(line.split("\\W+"))) > .withSelectivity(10, 100, 0.9) > .withName("Split words") > > // Filter empty tokens. > .filter(token -> !token.isEmpty()) > .withSelectivity(0.99, 0.99, 0.99) > .withName("Filter empty words") > > // Attach counter to each word. > .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To > lower case, add counter") > > // Sum up counters for every word. > .reduceByKey( > Tuple2::getField0, > (t1, t2) -> new Tuple2<>(t1.getField0(), > t1.getField1() + t2.getField1()) > ) > .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, > 1, false, in -> Math.round(0.01 * in[0]))) > .withName("Add counters") > > // Execute the plan and collect the results. > //.collect(); > .writeKafkaTopic("test", FORMATTER_UDF, "job_test_1", > UDF_LOAD_PROFILE_ESTIMATOR ); > > > -- > > Dr. rer. nat. Mirko Kämpf > Müchelner Str. 23 > 06259 Frankleben > -- Dr. rer. nat. Mirko Kämpf Müchelner Str. 23 06259 Frankleben