Hey Wayang team,

I have added the KafkaTopicSink incl. Mappings and the code compiles
already, which is good for a Monday morning session ;-)

Here is my question:

In the example code which I have taken out of my WordCount Example, I need
FORMATTER_UDF and UDF_LOAD_PROFILE_ESTIMATOR in order to
call the new writeKafkaTopic(...) function.

Can somebody please provide me with a pointer to a place in the code or
docs, where I can read how to instantiate those objects?

Many thanks,
and have a creat start into the week.

Cheers,
Mirk

Object FORMATTER_UDF = null;
Object UDF_LOAD_PROFILE_ESTIMATOR = null;

// Get a plan builder.
WayangContext wayangContext = new WayangContext(new Configuration())
        .withPlugin(Java.basicPlugin());
//        .withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
        .withJobName(String.format("WordCount (%s)", topicName))
        .withUdfJarOf(KafkaTopicWordCount.class);

// Start building the WayangPlan.
Collection<Tuple2<String, Integer>> wordcounts = planBuilder
        // Read the text file.
        .readKafkaTopic(topicName).withName("Load data from topic")

        // Split each line by non-word characters.
        .flatMap(line -> Arrays.asList(line.split("\\W+")))
        .withSelectivity(10, 100, 0.9)
        .withName("Split words")

        // Filter empty tokens.
        .filter(token -> !token.isEmpty())
        .withSelectivity(0.99, 0.99, 0.99)
        .withName("Filter empty words")

        // Attach counter to each word.
        .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To
lower case, add counter")

        // Sum up counters for every word.
        .reduceByKey(
                Tuple2::getField0,
                (t1, t2) -> new Tuple2<>(t1.getField0(),
t1.getField1() + t2.getField1())
        )
        .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9,
1, false, in -> Math.round(0.01 * in[0])))
        .withName("Add counters")

        // Execute the plan and collect the results.
        //.collect();
        .writeKafkaTopic("test", FORMATTER_UDF, "job_test_1",
UDF_LOAD_PROFILE_ESTIMATOR );


-- 

Dr. rer. nat. Mirko Kämpf
Müchelner Str. 23
06259 Frankleben

Reply via email to