This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ed36ff  Remove hadoop dependency in Create Segment Command (#5271)
4ed36ff is described below

commit 4ed36ff6653c53a8780e9a9d68b370561c251f68
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Tue Apr 21 00:15:33 2020 -0700

    Remove hadoop dependency in Create Segment Command (#5271)
    
    * Remove hadoop dependency in Create Segment Command
    
    * move gson to fasterxml for github events quickstart
    
    * Address comments
---
 .../pinot/spi/filesystem/PinotFSFactory.java       |  11 +-
 pinot-tools/pom.xml                                |   5 -
 .../tools/admin/command/CreateSegmentCommand.java  |  74 +++-----
 .../githubevents/PullRequestMergedEvent.java       | 209 +++++++++------------
 .../PullRequestMergedEventsStream.java             |  73 ++++---
 5 files changed, 161 insertions(+), 211 deletions(-)

diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
index 38d0eff..97c1fd0 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
@@ -39,8 +39,12 @@ public class PinotFSFactory {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotFSFactory.class);
   private static final String LOCAL_PINOT_FS_SCHEME = "file";
   private static final String CLASS = "class";
+  private static final Map<String, PinotFS> PINOT_FS_MAP = new HashMap<String, 
PinotFS>() {
+    {
+      put(LOCAL_PINOT_FS_SCHEME, new LocalPinotFS());
+    }
+  };
 
-  private static Map<String, PinotFS> PINOT_FS_MAP = new HashMap<>();
 
   public static void register(String scheme, String fsClassName, Configuration 
configuration) {
     try {
@@ -66,11 +70,6 @@ public class PinotFSFactory {
       LOGGER.info("Got scheme {}, classname {}, starting to initialize", key, 
fsClassName);
       register(key, fsClassName, fsConfig.subset(key));
     }
-
-    if (!PINOT_FS_MAP.containsKey(LOCAL_PINOT_FS_SCHEME)) {
-      LOGGER.info("LocalPinotFS not configured, adding as default");
-      PINOT_FS_MAP.put(LOCAL_PINOT_FS_SCHEME, new LocalPinotFS());
-    }
   }
 
   public static PinotFS create(String scheme) {
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 2fbc384..0f3310f 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -83,11 +83,6 @@
       <scope>runtime</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
       <groupId>commons-cli</groupId>
       <artifactId>commons-cli</artifactId>
     </dependency>
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
index 6041dc8..8424c82 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
@@ -21,17 +21,12 @@ package org.apache.pinot.tools.admin.command;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -42,6 +37,8 @@ import 
org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
@@ -269,22 +266,25 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
     }
 
     // Filter out all input files.
-    final Path dataDirPath = new Path(_dataDir);
-    FileSystem fileSystem = FileSystem.get(URI.create(_dataDir), new 
Configuration());
+    URI dataDirURI = URI.create(_dataDir);
+    if (dataDirURI.getScheme() == null) {
+      dataDirURI = new File(_dataDir).toURI();
+    }
+    PinotFS pinotFS = PinotFSFactory.create(dataDirURI.getScheme());
 
-    if (!fileSystem.exists(dataDirPath) || 
!fileSystem.isDirectory(dataDirPath)) {
+    if (!pinotFS.exists(dataDirURI) || !pinotFS.isDirectory(dataDirURI)) {
       throw new RuntimeException("Data directory " + _dataDir + " not found.");
     }
 
     // Gather all data files
-    List<Path> dataFilePaths = getDataFilePaths(dataDirPath);
+    String[] dataFilePaths = pinotFS.listFiles(dataDirURI, true);
 
-    if ((dataFilePaths == null) || (dataFilePaths.size() == 0)) {
+    if ((dataFilePaths == null) || (dataFilePaths.length == 0)) {
       throw new RuntimeException(
           "Data directory " + _dataDir + " does not contain " + 
_format.toString().toUpperCase() + " files.");
     }
 
-    LOGGER.info("Accepted files: {}", 
Arrays.toString(dataFilePaths.toArray()));
+    LOGGER.info("Accepted files: {}", Arrays.toString(dataFilePaths));
 
     // Make sure output directory does not already exist, or can be 
overwritten.
     File outDir = new File(_outDir);
@@ -325,20 +325,25 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
 
     ExecutorService executor = Executors.newFixedThreadPool(_numThreads);
     int cnt = 0;
-    for (final Path dataFilePath : dataFilePaths) {
+    for (final String dataFilePath : dataFilePaths) {
       final int segCnt = cnt;
 
       executor.execute(new Runnable() {
         @Override
         public void run() {
           for (int curr = 0; curr <= _retry; curr++) {
+            File localDir = new File(UUID.randomUUID().toString());
             try {
               SegmentGeneratorConfig config = new 
SegmentGeneratorConfig(segmentGeneratorConfig);
-
-              String localFile = dataFilePath.getName();
-              Path localFilePath = new Path(localFile);
-              dataDirPath.getFileSystem(new 
Configuration()).copyToLocalFile(dataFilePath, localFilePath);
-              config.setInputFilePath(localFile);
+              URI dataFileUri = URI.create(dataFilePath);
+              String[] splits = dataFilePath.split("/");
+              String fileName = splits[splits.length - 1];
+              if (!isDataFile(fileName)) {
+                return;
+              }
+              File localFile = new File(localDir, fileName);
+              pinotFS.copyToLocalFile(dataFileUri, localFile);
+              config.setInputFilePath(localFile.getAbsolutePath());
               config.setSegmentName(_segmentName + "_" + segCnt);
               Schema schema = Schema.fromFile(new File(_schemaFile));
               config.setSchema(schema);
@@ -359,7 +364,7 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
                   if (_readerConfigFile != null) {
                     readerConfig = JsonUtils.fileToObject(new 
File(_readerConfigFile), CSVRecordReaderConfig.class);
                   }
-                  csvRecordReader.init(new File(localFile), schema, 
readerConfig);
+                  csvRecordReader.init(localFile, schema, readerConfig);
                   driver.init(config, csvRecordReader);
                   break;
                 default:
@@ -381,6 +386,8 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
               } else {
                 LOGGER.error("Failed to create Pinot segment, retry: {}/{}", 
curr + 1, _retry);
               }
+            } finally {
+              FileUtils.deleteQuietly(localDir);
             }
           }
         }
@@ -398,9 +405,10 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
     try {
       try {
         localTempDir.getParentFile().mkdirs();
-        FileSystem.get(URI.create(indexDir.toString()), new Configuration())
-            .copyToLocalFile(new Path(indexDir.toString()), new 
Path(localTempDir.toString()));
-      } catch (IOException e) {
+        URI indexDirUri = URI.create(indexDir.toString());
+        PinotFS pinotFs = PinotFSFactory.create(indexDirUri.getScheme());
+        pinotFs.copyToLocalFile(indexDirUri, localTempDir);
+      } catch (Exception e) {
         LOGGER.error("Failed to copy segment {} to local directory {} for 
verification.", indexDir, localTempDir, e);
         return false;
       }
@@ -419,28 +427,6 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
     }
   }
 
-  protected List<Path> getDataFilePaths(Path pathPattern)
-      throws IOException {
-    List<Path> tarFilePaths = new ArrayList<>();
-    FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), new 
Configuration());
-    getDataFilePathsHelper(fileSystem, fileSystem.globStatus(pathPattern), 
tarFilePaths);
-    return tarFilePaths;
-  }
-
-  protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[] 
fileStatuses, List<Path> tarFilePaths)
-      throws IOException {
-    for (FileStatus fileStatus : fileStatuses) {
-      Path path = fileStatus.getPath();
-      if (fileStatus.isDirectory()) {
-        getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), 
tarFilePaths);
-      } else {
-        if (isDataFile(path.getName())) {
-          tarFilePaths.add(path);
-        }
-      }
-    }
-  }
-
   protected boolean isDataFile(String fileName) {
     switch (_format) {
       case AVRO:
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEvent.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEvent.java
index 2292c04..c9a069e 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEvent.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEvent.java
@@ -18,13 +18,12 @@
  */
 package org.apache.pinot.tools.streams.githubevents;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.Lists;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import org.joda.time.format.DateTimeFormatter;
@@ -39,43 +38,43 @@ public class PullRequestMergedEvent {
   private static final DateTimeFormatter DATE_FORMATTER = 
ISODateTimeFormat.dateTimeNoMillis();
 
   // dimensions
-  private String _title;
-  private List<String> _labels;
-  private String _userId;
-  private String _userType;
-  private String _authorAssociation;
-  private String _mergedBy;
-  private List<String> _assignees;
-  private List<String> _committers;
-  private List<String> _authors;
-  private List<String> _reviewers;
-  private List<String> _commenters;
-  private List<String> _requestedReviewers;
-  private List<String> _requestedTeams;
-  private String _repo;
-  private String _organization;
+  private final String _title;
+  private final List<String> _labels;
+  private final String _userId;
+  private final String _userType;
+  private final String _authorAssociation;
+  private final String _mergedBy;
+  private final List<String> _assignees;
+  private final List<String> _committers;
+  private final List<String> _authors;
+  private final List<String> _reviewers;
+  private final List<String> _commenters;
+  private final List<String> _requestedReviewers;
+  private final List<String> _requestedTeams;
+  private final String _repo;
+  private final String _organization;
 
   // metrics
-  private long _numComments;
-  private long _numReviewComments;
-  private long _numCommits;
-  private long _numLinesAdded;
-  private long _numLinesDeleted;
-  private long _numFilesChanged;
-  private long _numReviewers;
-  private long _numCommenters;
-  private long _numCommitters;
-  private long _numAuthors;
-  private long _createdTimeMillis;
-  private long _elapsedTimeMillis;
+  private final long _numComments;
+  private final long _numReviewComments;
+  private final long _numCommits;
+  private final long _numLinesAdded;
+  private final long _numLinesDeleted;
+  private final long _numFilesChanged;
+  private final long _numReviewers;
+  private final long _numCommenters;
+  private final long _numCommitters;
+  private final long _numAuthors;
+  private final long _createdTimeMillis;
+  private final long _elapsedTimeMillis;
 
   // time
-  private long _mergedTimeMillis;
+  private final long _mergedTimeMillis;
 
-  private JsonObject _pullRequest;
-  private JsonArray _commitsArray;
-  private JsonArray _reviewCommentsArray;
-  private JsonArray _commentsArray;
+  private final JsonNode _pullRequest;
+  private final JsonNode _commitsArray;
+  private final JsonNode _reviewCommentsArray;
+  private final JsonNode _commentsArray;
 
   /**
    * Construct a PullRequestMergedEvent from the github event of type 
PullRequestEvent which is also merged and closed.
@@ -85,23 +84,23 @@ public class PullRequestMergedEvent {
    * @param reviewComments - review comments data corresponding to the event
    * @param comments - comments data corresponding to the event
    */
-  public PullRequestMergedEvent(JsonObject event, JsonArray commits, JsonArray 
reviewComments, JsonArray comments) {
+  public PullRequestMergedEvent(JsonNode event, JsonNode commits, JsonNode 
reviewComments, JsonNode comments) {
 
-    JsonObject payload = event.get("payload").getAsJsonObject();
-    _pullRequest = payload.get("pull_request").getAsJsonObject();
+    JsonNode payload = event.get("payload");
+    _pullRequest = payload.get("pull_request");
     _commitsArray = commits;
     _reviewCommentsArray = reviewComments;
     _commentsArray = comments;
 
     // Dimensions
-    _title = _pullRequest.get("title").getAsString();
+    _title = _pullRequest.get("title").asText();
     _labels = extractLabels();
-    JsonObject user = _pullRequest.get("user").getAsJsonObject();
-    _userId = user.get("login").getAsString();
-    _userType = user.get("type").getAsString();
-    _authorAssociation = _pullRequest.get("author_association").getAsString();
-    JsonObject mergedBy = _pullRequest.get("merged_by").getAsJsonObject();
-    _mergedBy = mergedBy.get("login").getAsString();
+    JsonNode user = _pullRequest.get("user");
+    _userId = user.get("login").asText();
+    _userType = user.get("type").asText();
+    _authorAssociation = _pullRequest.get("author_association").asText();
+    JsonNode mergedBy = _pullRequest.get("merged_by");
+    _mergedBy = mergedBy.get("login").asText();
     _assignees = extractAssignees();
     _committers = Lists.newArrayList(extractCommitters());
     _authors = Lists.newArrayList(extractAuthors());
@@ -109,26 +108,26 @@ public class PullRequestMergedEvent {
     _commenters = Lists.newArrayList(extractCommenters());
     _requestedReviewers = extractRequestedReviewers();
     _requestedTeams = extractRequestedTeams();
-    JsonObject repo = event.get("repo").getAsJsonObject();
-    String[] repoName = repo.get("name").getAsString().split("/");
+    JsonNode repo = event.get("repo");
+    String[] repoName = repo.get("name").asText().split("/");
     _repo = repoName[1];
     _organization = repoName[0];
 
     // Metrics
-    _numComments = _pullRequest.get("comments").getAsInt();
-    _numReviewComments = _pullRequest.get("review_comments").getAsInt();
-    _numCommits = _pullRequest.get("commits").getAsInt();
-    _numLinesAdded = _pullRequest.get("additions").getAsInt();
-    _numLinesDeleted = _pullRequest.get("deletions").getAsInt();
-    _numFilesChanged = _pullRequest.get("changed_files").getAsInt();
+    _numComments = _pullRequest.get("comments").asInt();
+    _numReviewComments = _pullRequest.get("review_comments").asInt();
+    _numCommits = _pullRequest.get("commits").asInt();
+    _numLinesAdded = _pullRequest.get("additions").asInt();
+    _numLinesDeleted = _pullRequest.get("deletions").asInt();
+    _numFilesChanged = _pullRequest.get("changed_files").asInt();
     _numReviewers = _reviewers.size();
     _numCommenters = _commenters.size();
     _numCommitters = _committers.size();
     _numAuthors = _authors.size();
 
     // Time
-    _createdTimeMillis = 
DATE_FORMATTER.parseMillis(_pullRequest.get("created_at").getAsString());
-    _mergedTimeMillis = 
DATE_FORMATTER.parseMillis(_pullRequest.get("merged_at").getAsString());
+    _createdTimeMillis = 
DATE_FORMATTER.parseMillis(_pullRequest.get("created_at").asText());
+    _mergedTimeMillis = 
DATE_FORMATTER.parseMillis(_pullRequest.get("merged_at").asText());
     _elapsedTimeMillis = _mergedTimeMillis - _createdTimeMillis;
   }
 
@@ -137,10 +136,10 @@ public class PullRequestMergedEvent {
    */
   private Set<String> extractReviewers() {
     Set<String> reviewers;
-    if (_reviewCommentsArray != null && !_reviewCommentsArray.isJsonNull() & 
_reviewCommentsArray.size() > 0) {
+    if (_reviewCommentsArray != null && _reviewCommentsArray.size() > 0) {
       reviewers = new HashSet<>();
-      for (JsonElement reviewComment : _reviewCommentsArray) {
-        
reviewers.add(reviewComment.getAsJsonObject().get("user").getAsJsonObject().get("login").getAsString());
+      for (JsonNode reviewComment : _reviewCommentsArray) {
+        reviewers.add(reviewComment.get("user").get("login").asText());
       }
     } else {
       reviewers = Collections.emptySet();
@@ -153,10 +152,10 @@ public class PullRequestMergedEvent {
    */
   private Set<String> extractCommenters() {
     Set<String> commenters;
-    if (_commentsArray != null && !_commentsArray.isJsonNull() & 
_commentsArray.size() > 0) {
+    if (_commentsArray != null && _commentsArray.size() > 0) {
       commenters = new HashSet<>();
-      for (JsonElement comment : _commentsArray) {
-        
commenters.add(comment.getAsJsonObject().get("user").getAsJsonObject().get("login").getAsString());
+      for (JsonNode comment : _commentsArray) {
+        commenters.add(comment.get("user").get("login").asText());
       }
     } else {
       commenters = Collections.emptySet();
@@ -169,17 +168,15 @@ public class PullRequestMergedEvent {
    */
   private Set<String> extractCommitters() {
     Set<String> committers;
-    if (_commitsArray != null && !_commitsArray.isJsonNull() & 
_commitsArray.size() > 0) {
+    if (_commitsArray != null && _commitsArray.size() > 0) {
       committers = new HashSet<>();
-      for (JsonElement commit : _commitsArray) {
-        JsonObject commitAsJsonObject = commit.getAsJsonObject();
-        JsonElement committer = commitAsJsonObject.get("committer");
-        if (committer.isJsonNull()) {
-          committers.add(
-              
commitAsJsonObject.get("commit").getAsJsonObject().get("committer").getAsJsonObject().get("name")
-                  .getAsString());
+      for (JsonNode commit : _commitsArray) {
+        JsonNode commitAsJsonNode = commit;
+        JsonNode committer = commitAsJsonNode.get("committer");
+        if (committer.size() == 0) {
+          
committers.add(commitAsJsonNode.get("commit").get("committer").get("name").asText());
         } else {
-          
committers.add(committer.getAsJsonObject().get("login").getAsString());
+          committers.add(committer.get("login").asText());
         }
       }
     } else {
@@ -193,16 +190,15 @@ public class PullRequestMergedEvent {
    */
   private Set<String> extractAuthors() {
     Set<String> authors;
-    if (_commitsArray != null && !_commitsArray.isJsonNull() & 
_commitsArray.size() > 0) {
+    if (_commitsArray != null && _commitsArray.size() > 0) {
       authors = new HashSet<>();
-      for (JsonElement commit : _commitsArray) {
-        JsonObject commitAsJsonObject = commit.getAsJsonObject();
-        JsonElement author = commitAsJsonObject.get("author");
-        if (author.isJsonNull()) {
-          
authors.add(commitAsJsonObject.get("commit").getAsJsonObject().get("author").getAsJsonObject().get("name")
-              .getAsString());
+      for (JsonNode commit : _commitsArray) {
+        JsonNode commitAsJsonNode = commit;
+        JsonNode author = commitAsJsonNode.get("author");
+        if (author.size() == 0) {
+          
authors.add(commitAsJsonNode.get("commit").get("author").get("name").asText());
         } else {
-          authors.add(author.getAsJsonObject().get("login").getAsString());
+          authors.add(author.get("login").asText());
         }
       }
     } else {
@@ -215,16 +211,10 @@ public class PullRequestMergedEvent {
    * Extracts labels for the PR
    */
   private List<String> extractLabels() {
-    JsonArray labelsArray = _pullRequest.get("labels").getAsJsonArray();
-    List<String> labels;
-    int size = labelsArray.size();
-    if (size > 0) {
-      labels = new ArrayList<>(size);
-      for (JsonElement label : labelsArray) {
-        labels.add(label.getAsJsonObject().get("name").getAsString());
-      }
-    } else {
-      labels = Collections.emptyList();
+    Iterator<JsonNode> labelsIterator = _pullRequest.get("labels").elements();
+    List<String> labels = new ArrayList<>();
+    while (labelsIterator.hasNext()) {
+      labels.add(labelsIterator.next().get("name").asText());
     }
     return labels;
   }
@@ -233,16 +223,10 @@ public class PullRequestMergedEvent {
    * Extracts assignees for the PR
    */
   private List<String> extractAssignees() {
-    JsonArray assigneesJson = _pullRequest.get("assignees").getAsJsonArray();
-    int numAssignees = assigneesJson.size();
-    List<String> assignees;
-    if (numAssignees > 0) {
-      assignees = new ArrayList<>(numAssignees);
-      for (JsonElement reviewer : assigneesJson) {
-        assignees.add(reviewer.getAsJsonObject().get("login").getAsString());
-      }
-    } else {
-      assignees = Collections.emptyList();
+    Iterator<JsonNode> assigneesIterator = 
_pullRequest.get("assignees").elements();
+    List<String> assignees = new ArrayList<>();
+    while (assigneesIterator.hasNext()) {
+      assignees.add(assigneesIterator.next().get("login").asText());
     }
     return assignees;
   }
@@ -251,16 +235,10 @@ public class PullRequestMergedEvent {
    * Extracts list of requested reviewers
    */
   private List<String> extractRequestedReviewers() {
-    JsonArray requestedReviewersJson = 
_pullRequest.get("requested_reviewers").getAsJsonArray();
-    int numRequestedReviewers = requestedReviewersJson.size();
-    List<String> requestedReviewers;
-    if (numRequestedReviewers > 0) {
-      requestedReviewers = new ArrayList<>(numRequestedReviewers);
-      for (JsonElement reviewer : requestedReviewersJson) {
-        
requestedReviewers.add(reviewer.getAsJsonObject().get("login").getAsString());
-      }
-    } else {
-      requestedReviewers = Collections.emptyList();
+    Iterator<JsonNode> requestedReviewersIterator = 
_pullRequest.get("requested_reviewers").elements();
+    List<String> requestedReviewers = new ArrayList<>();
+    while (requestedReviewersIterator.hasNext()) {
+      
requestedReviewers.add(requestedReviewersIterator.next().get("login").asText());
     }
     return requestedReviewers;
   }
@@ -269,16 +247,11 @@ public class PullRequestMergedEvent {
    * Extracts list of review requested teams
    */
   private List<String> extractRequestedTeams() {
-    JsonArray requestedTeamsJson = 
_pullRequest.get("requested_teams").getAsJsonArray();
-    int numRequestedTeams = requestedTeamsJson.size();
-    List<String> requestedTeams;
-    if (numRequestedTeams > 0) {
-      requestedTeams = new ArrayList<>(numRequestedTeams);
-      for (JsonElement team : requestedTeamsJson) {
-        requestedTeams.add(team.getAsJsonObject().get("name").getAsString());
-      }
-    } else {
-      requestedTeams = Collections.emptyList();
+
+    Iterator<JsonNode> requestedTeamsIterator = 
_pullRequest.get("requested_teams").elements();
+    List<String> requestedTeams = new ArrayList<>();
+    while (requestedTeamsIterator.hasNext()) {
+      requestedTeams.add(requestedTeamsIterator.next().get("name").asText());
     }
     return requestedTeams;
   }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
index d9866b4..c9d8f5f 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
@@ -18,11 +18,8 @@
  */
 package org.apache.pinot.tools.streams.githubevents;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
@@ -36,6 +33,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.Quickstart;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.slf4j.Logger;
@@ -54,12 +52,12 @@ public class PullRequestMergedEventsStream {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PullRequestMergedEventsStream.class);
   private static final long SLEEP_MILLIS = 10_000;
 
-  private ExecutorService _service;
+  private final ExecutorService _service;
   private boolean _keepStreaming = true;
 
-  private Schema _avroSchema;
-  private String _topicName;
-  private GitHubAPICaller _gitHubAPICaller;
+  private final Schema _avroSchema;
+  private final String _topicName;
+  private final GitHubAPICaller _gitHubAPICaller;
 
   private StreamDataProducer _producer;
 
@@ -93,6 +91,17 @@ public class PullRequestMergedEventsStream {
     _producer = 
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
   }
 
+  public static void main(String[] args)
+      throws Exception {
+    String personalAccessToken = args[0];
+    String schemaFile = args[1];
+    String topic = "pullRequestMergedEvent";
+    PullRequestMergedEventsStream stream =
+        new PullRequestMergedEventsStream(schemaFile, topic, 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER,
+            personalAccessToken);
+    stream.execute();
+  }
+
   /**
    * Starts the stream.
    * Adds shutdown hook.
@@ -150,9 +159,8 @@ public class PullRequestMergedEventsStream {
           switch (githubAPIResponse.statusCode) {
             case 200: // Read new events
               etag = githubAPIResponse.etag;
-              JsonArray jsonArray = new 
JsonParser().parse(githubAPIResponse.responseString).getAsJsonArray();
-
-              for (JsonElement eventElement : jsonArray) {
+              JsonNode jsonArray = 
JsonUtils.stringToJsonNode(githubAPIResponse.responseString);
+              for (JsonNode eventElement : jsonArray) {
                 try {
                   GenericRecord genericRecord = 
convertToPullRequestMergedGenericRecord(eventElement);
                   if (genericRecord != null) {
@@ -206,41 +214,41 @@ public class PullRequestMergedEventsStream {
    * Find commits, review comments, comments corresponding to this pull 
request event.
    * Construct a PullRequestMergedEvent with the help of the event, commits, 
review comments and comments.
    * Converts PullRequestMergedEvent to GenericRecord
+   * @param event
    */
-  private GenericRecord convertToPullRequestMergedGenericRecord(JsonElement 
eventJson)
+  private GenericRecord convertToPullRequestMergedGenericRecord(JsonNode event)
       throws IOException {
     GenericRecord genericRecord = null;
-    JsonObject event = eventJson.getAsJsonObject();
-    String type = event.get("type").getAsString();
+    String type = event.get("type").asText();
 
     if ("PullRequestEvent".equals(type)) {
-      JsonObject payload = event.get("payload").getAsJsonObject();
+      JsonNode payload = event.get("payload");
       if (payload != null) {
-        String action = payload.get("action").getAsString();
-        JsonObject pullRequest = payload.get("pull_request").getAsJsonObject();
-        String merged = pullRequest.get("merged").getAsString();
+        String action = payload.get("action").asText();
+        JsonNode pullRequest = payload.get("pull_request");
+        String merged = pullRequest.get("merged").asText();
         if ("closed".equals(action) && "true".equals(merged)) { // valid pull 
request merge event
 
-          JsonArray commits = null;
-          String commitsURL = pullRequest.get("commits_url").getAsString();
+          JsonNode commits = null;
+          String commitsURL = pullRequest.get("commits_url").asText();
           GitHubAPICaller.GitHubAPIResponse commitsResponse = 
_gitHubAPICaller.callAPI(commitsURL);
 
           if (commitsResponse.responseString != null) {
-            commits = new 
JsonParser().parse(commitsResponse.responseString).getAsJsonArray();
+            commits = 
JsonUtils.stringToJsonNode(commitsResponse.responseString);
           }
 
-          JsonArray reviewComments = null;
-          String reviewCommentsURL = 
pullRequest.get("review_comments_url").getAsString();
+          JsonNode reviewComments = null;
+          String reviewCommentsURL = 
pullRequest.get("review_comments_url").asText();
           GitHubAPICaller.GitHubAPIResponse reviewCommentsResponse = 
_gitHubAPICaller.callAPI(reviewCommentsURL);
           if (reviewCommentsResponse.responseString != null) {
-            reviewComments = new 
JsonParser().parse(reviewCommentsResponse.responseString).getAsJsonArray();
+            reviewComments = 
JsonUtils.stringToJsonNode(reviewCommentsResponse.responseString);
           }
 
-          JsonArray comments = null;
-          String commentsURL = pullRequest.get("comments_url").getAsString();
+          JsonNode comments = null;
+          String commentsURL = pullRequest.get("comments_url").asText();
           GitHubAPICaller.GitHubAPIResponse commentsResponse = 
_gitHubAPICaller.callAPI(commentsURL);
           if (commentsResponse.responseString != null) {
-            comments = new 
JsonParser().parse(commentsResponse.responseString).getAsJsonArray();
+            comments = 
JsonUtils.stringToJsonNode(commentsResponse.responseString);
           }
 
           // get PullRequestMergeEvent
@@ -296,15 +304,4 @@ public class PullRequestMergedEventsStream {
 
     return genericRecord;
   }
-
-  public static void main(String[] args)
-      throws Exception {
-    String personalAccessToken = args[0];
-    String schemaFile = args[1];
-    String topic = "pullRequestMergedEvent";
-    PullRequestMergedEventsStream stream =
-        new PullRequestMergedEventsStream(schemaFile, topic, 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER,
-            personalAccessToken);
-    stream.execute();
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to