Hey Mirko, to me it looks like you defined these inputs as you need them for formatting the data you write (as in a UDF, a lambda or similar) and in order to instantiate a LoadProfileEstimator (see https://github.com/apache/incubator-wayang/blob/ba23d214d93025898e5946b44438abb9e8d1fd32/wayang-benchmark/src/main/scala/org/apache/wayang/apps/simwords/Word2NVec.scala#L79 on how to do this from a config). In your code (https://github.com/apache/incubator-wayang/blob/cd7b6ea94d12c733cd23780f0b290178e44aab0e/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/DataQuanta.scala#L785) you define the LoadProfileEstimator as optional. Have you tried running the .writeKafkaTopic with a lambda that passes the input as is?
Best, Juri ________________________________ From: Mirko Kämpf <mirko.kae...@gmail.com> Sent: 12 February 2024 09:37 To: dev@wayang.apache.org <dev@wayang.apache.org> Subject: Next steps towards KafkaTopicSink [You don't often get email from mirko.kae...@gmail.com. Learn why this is important at https://aka.ms/LearnAboutSenderIdentification ] Hey Wayang team, I have added the KafkaTopicSink incl. Mappings and the code compiles already, which is good for a Monday morning session ;-) Here is my question: In the example code which I have taken out of my WordCount Example, I need FORMATTER_UDF and UDF_LOAD_PROFILE_ESTIMATOR in order to call the new writeKafkaTopic(...) function. Can somebody please provide me with a pointer to a place in the code or docs, where I can read how to instantiate those objects? Many thanks, and have a creat start into the week. Cheers, Mirk Object FORMATTER_UDF = null; Object UDF_LOAD_PROFILE_ESTIMATOR = null; // Get a plan builder. WayangContext wayangContext = new WayangContext(new Configuration()) .withPlugin(Java.basicPlugin()); // .withPlugin(Spark.basicPlugin()); JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext) .withJobName(String.format("WordCount (%s)", topicName)) .withUdfJarOf(KafkaTopicWordCount.class); // Start building the WayangPlan. Collection<Tuple2<String, Integer>> wordcounts = planBuilder // Read the text file. .readKafkaTopic(topicName).withName("Load data from topic") // Split each line by non-word characters. .flatMap(line -> Arrays.asList(line.split("\\W+"))) .withSelectivity(10, 100, 0.9) .withName("Split words") // Filter empty tokens. .filter(token -> !token.isEmpty()) .withSelectivity(0.99, 0.99, 0.99) .withName("Filter empty words") // Attach counter to each word. .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter") // Sum up counters for every word. .reduceByKey( Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()) ) .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0]))) .withName("Add counters") // Execute the plan and collect the results. //.collect(); .writeKafkaTopic("test", FORMATTER_UDF, "job_test_1", UDF_LOAD_PROFILE_ESTIMATOR ); -- Dr. rer. nat. Mirko Kämpf Müchelner Str. 23 06259 Frankleben