kamir commented on PR #403:
URL: https://github.com/apache/incubator-wayang/pull/403#issuecomment-1939480791
KafkaSource and KafkaSink are ready.
I will add tests later this week, or maybe next weekend.
But my external KafkaWordCountDemo works.
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.function.FunctionDescriptor;
import
org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.java.Java;
import java.util.Arrays;
import java.util.Collection;
public class KafkaTopicWordCount {
// Define the lambda function for formatting the output
private static final
FunctionDescriptor.SerializableFunction<Tuple2<String, Integer>, String> udf =
tuple -> {
return tuple.getField0() + ": " + tuple.getField1();
};
public static void main(String[] args){
System.out.println( ">>> Apache Wayang Test #01");
System.out.println( " We use a Kafka topic and a 'Java
Context'.");
// Default topic name
String topicName = "banking-tx-small-csv";
// Check if at least one argument is provided
if (args.length > 0) {
// Assuming the first argument is the topic name
topicName = args[0];
int i = 0;
for (String arg : args) {
String line = String.format( " %d - %s", i,arg);
System.out.println(line);
i=i+1;
}
}
else {
System.out.println( "*** Use default topic name: " + topicName );
}
Configuration configuration = new Configuration();
// Get a plan builder.
WayangContext wayangContext = new WayangContext(configuration)
.withPlugin(Java.basicPlugin());
// .withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName(String.format("WordCount (%s)", topicName))
.withUdfJarOf(KafkaTopicWordCount.class);
/*
// Start building the WayangPlan.
Collection<Tuple2<String, Integer>> wordcounts_collection =
planBuilder
// Read the text file.
.readKafkaTopic(topicName).withName("Load data from topic")
// Split each line by non-word characters.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9)
.withName("Split words")
// Filter empty tokens.
.filter(token -> !token.isEmpty())
.withSelectivity(0.99, 0.99, 0.99)
.withName("Filter empty words")
// Attach counter to each word.
.map(word -> new Tuple2<>(word.toLowerCase(),
1)).withName("To lower case, add counter")
// Sum up counters for every word.
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(),
t1.getField1() + t2.getField1())
)
.withCardinalityEstimator(new
DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
.withName("Add counters")
// Execute the plan and collect the results.
.collect();
System.out.println( wordcounts_collection );
System.out.println( "### Done. ###" );
*/
// Start building the WayangPlan.
planBuilder
// Read the text file.
.readKafkaTopic(topicName).withName("Load data from topic")
// Split each line by non-word characters.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9)
.withName("Split words")
// Filter empty tokens.
.filter(token -> !token.isEmpty())
.withSelectivity(0.99, 0.99, 0.99)
.withName("Filter empty words")
// Attach counter to each word.
.map(word -> new Tuple2<>(word.toLowerCase(),
1)).withName("To lower case, add counter")
// Sum up counters for every word.
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(),
t1.getField1() + t2.getField1())
)
.withCardinalityEstimator(new
DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
.withName("Add counters")
// Execute the plan and store the results in Kafka.
//.writeKafkaTopic("file:///Users/mkaempf/GITHUB.private/open-sustainability-data/bin/test_23456.txt",
d -> String.format("%.2f, %d", d.getField1(), d.getField0()), "job_test_1",
.writeKafkaTopic("test_23456", d -> String.format("%d, %s",
d.getField1(), d.getField0()), "job_test_1",
LoadProfileEstimators.createFromSpecification("wayang.java.kafkatopicsink.load",
configuration) );
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]