This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch fix-rsvp-meetup
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 8d40b4a4706b7123b8da099b149807fdbc32f30e
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Tue Oct 13 14:55:15 2020 -0700

    Remove tyrus dependencies
---
 .../org/apache/pinot/tools/RealtimeQuickStart.java |  9 ++-
 .../pinot/tools/streams/MeetupRsvpStream.java      | 73 +++++++++++++++++++++-
 2 files changed, 78 insertions(+), 4 deletions(-)

diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 0af2b7c..23d06e6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -38,6 +38,7 @@ import static org.apache.pinot.tools.Quickstart.printStatus;
 
 public class RealtimeQuickStart {
   private StreamDataServerStartable _kafkaStarter;
+  private File _dataFile;
 
   public static void main(String[] args)
       throws Exception {
@@ -64,6 +65,12 @@ public class RealtimeQuickStart {
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, tableConfigFile);
 
+    _dataFile = new File(configDir, "meetupRsvp_data.json");
+    resource = Quickstart.class.getClassLoader()
+        
.getResource("examples/stream/meetupRsvp/sample_data/meetupRsvp_data.json");
+    Preconditions.checkNotNull(resource);
+    FileUtils.copyURLToFile(resource, _dataFile);
+
     QuickstartTableRequest request = new QuickstartTableRequest("meetupRsvp", 
schemaFile, tableConfigFile);
     final QuickstartRunner runner = new 
QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir);
 
@@ -77,7 +84,7 @@ public class RealtimeQuickStart {
     _kafkaStarter.start();
     _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(10));
     printStatus(Color.CYAN, "***** Starting meetup data stream and publishing 
to Kafka *****");
-    MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
+    MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(_dataFile);
     meetupRSVPProvider.run();
     printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and 
broker *****");
     runner.startAll();
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index 86a89db..f42e897 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -20,7 +20,11 @@ package org.apache.pinot.tools.streams;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.util.Properties;
@@ -43,13 +47,73 @@ public class MeetupRsvpStream {
   private boolean keepPublishing = true;
   private ClientManager client;
 
-  public MeetupRsvpStream()
+  private FileInputStream _fileInputStream;
+  private BufferedReader _bufferedReader;
+  private File _dataFile;
+
+  public MeetupRsvpStream(File dataFile)
       throws Exception {
     Properties properties = new Properties();
     properties.put("metadata.broker.list", 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
     producer = 
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
+    _dataFile = dataFile;
+  }
+
+  public void publishStream()
+      throws IOException {
+
+    _fileInputStream = new FileInputStream(_dataFile);
+    _bufferedReader = new BufferedReader(new 
InputStreamReader(_fileInputStream, "UTF8"));
+
+    String jsonString;
+    try {
+      while ((jsonString = _bufferedReader.readLine()) != null) {
+        if (!keepPublishing) {
+          return;
+        }
+        System.out.println(jsonString);
+        if (jsonString.isEmpty()) {
+          LOGGER.error("Json string is empty");
+          System.out.println("Json string is empty");
+          continue;
+        }
+        JsonNode messageJSON = JsonUtils.stringToJsonNode(jsonString);
+        ObjectNode extracted = JsonUtils.newObjectNode();
+
+        JsonNode venue = messageJSON.get("venue");
+        if (venue != null) {
+          extracted.set("venue_name", venue.get("venue_name"));
+        }
+
+        JsonNode event = messageJSON.get("event");
+        if (event != null) {
+          extracted.set("event_name", event.get("event_name"));
+          extracted.set("event_id", event.get("event_id"));
+          extracted.set("event_time", event.get("time"));
+        }
+
+        JsonNode group = messageJSON.get("group");
+        if (group != null) {
+          extracted.set("group_city", group.get("group_city"));
+          extracted.set("group_country", group.get("group_country"));
+          extracted.set("group_id", group.get("group_id"));
+          extracted.set("group_name", group.get("group_name"));
+          extracted.set("group_lat", group.get("group_lat"));
+          extracted.set("group_lon", group.get("group_lon"));
+        }
+
+        extracted.set("mtime", messageJSON.get("mtime"));
+        extracted.put("rsvp_count", 1);
+
+        if (keepPublishing) {
+          producer.produce("meetupRSVPEvents", 
extracted.toString().getBytes(StandardCharsets.UTF_8));
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("encountered an error running the meetupRSVPEvents stream", 
e);
+    }
   }
 
   public void stopPublishing() {
@@ -58,7 +122,9 @@ public class MeetupRsvpStream {
     client.shutdown();
   }
 
-  public void run() {
+  public void run()
+      throws IOException {
+//    publishStream();
     try {
       ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
       client = ClientManager.createClient();
@@ -72,6 +138,7 @@ public class MeetupRsvpStream {
               @Override
               public void onMessage(String message) {
                 try {
+                  LOGGER.info("AAA: " + message);
                   JsonNode messageJSON = JsonUtils.stringToJsonNode(message);
                   ObjectNode extracted = JsonUtils.newObjectNode();
 
@@ -113,7 +180,7 @@ public class MeetupRsvpStream {
             LOGGER.error("found an event where data did not have all the 
fields, don't care about for quickstart", e);
           }
         }
-      }, cec, new URI("ws://stream.meetup.com/2/rsvps"));
+      }, cec, new URI("http://stream.meetup.com/2/rsvps";));
     } catch (Exception e) {
       LOGGER.error("encountered an error running the meetupRSVPEvents stream", 
e);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to