This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch fix-rsvp-meetup in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 8d40b4a4706b7123b8da099b149807fdbc32f30e Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Tue Oct 13 14:55:15 2020 -0700 Remove tyrus dependencies --- .../org/apache/pinot/tools/RealtimeQuickStart.java | 9 ++- .../pinot/tools/streams/MeetupRsvpStream.java | 73 +++++++++++++++++++++- 2 files changed, 78 insertions(+), 4 deletions(-) diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java index 0af2b7c..23d06e6 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java @@ -38,6 +38,7 @@ import static org.apache.pinot.tools.Quickstart.printStatus; public class RealtimeQuickStart { private StreamDataServerStartable _kafkaStarter; + private File _dataFile; public static void main(String[] args) throws Exception { @@ -64,6 +65,12 @@ public class RealtimeQuickStart { com.google.common.base.Preconditions.checkNotNull(resource); FileUtils.copyURLToFile(resource, tableConfigFile); + _dataFile = new File(configDir, "meetupRsvp_data.json"); + resource = Quickstart.class.getClassLoader() + .getResource("examples/stream/meetupRsvp/sample_data/meetupRsvp_data.json"); + Preconditions.checkNotNull(resource); + FileUtils.copyURLToFile(resource, _dataFile); + QuickstartTableRequest request = new QuickstartTableRequest("meetupRsvp", schemaFile, tableConfigFile); final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir); @@ -77,7 +84,7 @@ public class RealtimeQuickStart { _kafkaStarter.start(); _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(10)); printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); - MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(); + MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(_dataFile); meetupRSVPProvider.run(); printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****"); runner.startAll(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java index 86a89db..f42e897 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java @@ -20,7 +20,11 @@ package org.apache.pinot.tools.streams; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Properties; @@ -43,13 +47,73 @@ public class MeetupRsvpStream { private boolean keepPublishing = true; private ClientManager client; - public MeetupRsvpStream() + private FileInputStream _fileInputStream; + private BufferedReader _bufferedReader; + private File _dataFile; + + public MeetupRsvpStream(File dataFile) throws Exception { Properties properties = new Properties(); properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER); properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); properties.put("request.required.acks", "1"); producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); + _dataFile = dataFile; + } + + public void publishStream() + throws IOException { + + _fileInputStream = new FileInputStream(_dataFile); + _bufferedReader = new BufferedReader(new InputStreamReader(_fileInputStream, "UTF8")); + + String jsonString; + try { + while ((jsonString = _bufferedReader.readLine()) != null) { + if (!keepPublishing) { + return; + } + System.out.println(jsonString); + if (jsonString.isEmpty()) { + LOGGER.error("Json string is empty"); + System.out.println("Json string is empty"); + continue; + } + JsonNode messageJSON = JsonUtils.stringToJsonNode(jsonString); + ObjectNode extracted = JsonUtils.newObjectNode(); + + JsonNode venue = messageJSON.get("venue"); + if (venue != null) { + extracted.set("venue_name", venue.get("venue_name")); + } + + JsonNode event = messageJSON.get("event"); + if (event != null) { + extracted.set("event_name", event.get("event_name")); + extracted.set("event_id", event.get("event_id")); + extracted.set("event_time", event.get("time")); + } + + JsonNode group = messageJSON.get("group"); + if (group != null) { + extracted.set("group_city", group.get("group_city")); + extracted.set("group_country", group.get("group_country")); + extracted.set("group_id", group.get("group_id")); + extracted.set("group_name", group.get("group_name")); + extracted.set("group_lat", group.get("group_lat")); + extracted.set("group_lon", group.get("group_lon")); + } + + extracted.set("mtime", messageJSON.get("mtime")); + extracted.put("rsvp_count", 1); + + if (keepPublishing) { + producer.produce("meetupRSVPEvents", extracted.toString().getBytes(StandardCharsets.UTF_8)); + } + } + } catch (Exception e) { + LOGGER.error("encountered an error running the meetupRSVPEvents stream", e); + } } public void stopPublishing() { @@ -58,7 +122,9 @@ public class MeetupRsvpStream { client.shutdown(); } - public void run() { + public void run() + throws IOException { +// publishStream(); try { ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build(); client = ClientManager.createClient(); @@ -72,6 +138,7 @@ public class MeetupRsvpStream { @Override public void onMessage(String message) { try { + LOGGER.info("AAA: " + message); JsonNode messageJSON = JsonUtils.stringToJsonNode(message); ObjectNode extracted = JsonUtils.newObjectNode(); @@ -113,7 +180,7 @@ public class MeetupRsvpStream { LOGGER.error("found an event where data did not have all the fields, don't care about for quickstart", e); } } - }, cec, new URI("ws://stream.meetup.com/2/rsvps")); + }, cec, new URI("http://stream.meetup.com/2/rsvps")); } catch (Exception e) { LOGGER.error("encountered an error running the meetupRSVPEvents stream", e); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org