Hey Wayang team, I have added the KafkaTopicSink incl. Mappings and the code compiles already, which is good for a Monday morning session ;-)
Here is my question: In the example code which I have taken out of my WordCount Example, I need FORMATTER_UDF and UDF_LOAD_PROFILE_ESTIMATOR in order to call the new writeKafkaTopic(...) function. Can somebody please provide me with a pointer to a place in the code or docs, where I can read how to instantiate those objects? Many thanks, and have a creat start into the week. Cheers, Mirk Object FORMATTER_UDF = null; Object UDF_LOAD_PROFILE_ESTIMATOR = null; // Get a plan builder. WayangContext wayangContext = new WayangContext(new Configuration()) .withPlugin(Java.basicPlugin()); // .withPlugin(Spark.basicPlugin()); JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext) .withJobName(String.format("WordCount (%s)", topicName)) .withUdfJarOf(KafkaTopicWordCount.class); // Start building the WayangPlan. Collection<Tuple2<String, Integer>> wordcounts = planBuilder // Read the text file. .readKafkaTopic(topicName).withName("Load data from topic") // Split each line by non-word characters. .flatMap(line -> Arrays.asList(line.split("\\W+"))) .withSelectivity(10, 100, 0.9) .withName("Split words") // Filter empty tokens. .filter(token -> !token.isEmpty()) .withSelectivity(0.99, 0.99, 0.99) .withName("Filter empty words") // Attach counter to each word. .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter") // Sum up counters for every word. .reduceByKey( Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()) ) .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0]))) .withName("Add counters") // Execute the plan and collect the results. //.collect(); .writeKafkaTopic("test", FORMATTER_UDF, "job_test_1", UDF_LOAD_PROFILE_ESTIMATOR ); -- Dr. rer. nat. Mirko Kämpf Müchelner Str. 23 06259 Frankleben