This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4ed36ff Remove hadoop dependency in Create Segment Command (#5271)
4ed36ff is described below
commit 4ed36ff6653c53a8780e9a9d68b370561c251f68
Author: Xiang Fu <[email protected]>
AuthorDate: Tue Apr 21 00:15:33 2020 -0700
Remove hadoop dependency in Create Segment Command (#5271)
* Remove hadoop dependency in Create Segment Command
* move gson to fasterxml for github events quickstart
* Address comments
---
.../pinot/spi/filesystem/PinotFSFactory.java | 11 +-
pinot-tools/pom.xml | 5 -
.../tools/admin/command/CreateSegmentCommand.java | 74 +++-----
.../githubevents/PullRequestMergedEvent.java | 209 +++++++++------------
.../PullRequestMergedEventsStream.java | 73 ++++---
5 files changed, 161 insertions(+), 211 deletions(-)
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
index 38d0eff..97c1fd0 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
@@ -39,8 +39,12 @@ public class PinotFSFactory {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotFSFactory.class);
private static final String LOCAL_PINOT_FS_SCHEME = "file";
private static final String CLASS = "class";
+ private static final Map<String, PinotFS> PINOT_FS_MAP = new HashMap<String,
PinotFS>() {
+ {
+ put(LOCAL_PINOT_FS_SCHEME, new LocalPinotFS());
+ }
+ };
- private static Map<String, PinotFS> PINOT_FS_MAP = new HashMap<>();
public static void register(String scheme, String fsClassName, Configuration
configuration) {
try {
@@ -66,11 +70,6 @@ public class PinotFSFactory {
LOGGER.info("Got scheme {}, classname {}, starting to initialize", key,
fsClassName);
register(key, fsClassName, fsConfig.subset(key));
}
-
- if (!PINOT_FS_MAP.containsKey(LOCAL_PINOT_FS_SCHEME)) {
- LOGGER.info("LocalPinotFS not configured, adding as default");
- PINOT_FS_MAP.put(LOCAL_PINOT_FS_SCHEME, new LocalPinotFS());
- }
}
public static PinotFS create(String scheme) {
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 2fbc384..0f3310f 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -83,11 +83,6 @@
<scope>runtime</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
index 6041dc8..8424c82 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
@@ -21,17 +21,12 @@ package org.apache.pinot.tools.admin.command;
import java.io.File;
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -42,6 +37,8 @@ import
org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.Command;
import org.kohsuke.args4j.Option;
@@ -269,22 +266,25 @@ public class CreateSegmentCommand extends
AbstractBaseAdminCommand implements Co
}
// Filter out all input files.
- final Path dataDirPath = new Path(_dataDir);
- FileSystem fileSystem = FileSystem.get(URI.create(_dataDir), new
Configuration());
+ URI dataDirURI = URI.create(_dataDir);
+ if (dataDirURI.getScheme() == null) {
+ dataDirURI = new File(_dataDir).toURI();
+ }
+ PinotFS pinotFS = PinotFSFactory.create(dataDirURI.getScheme());
- if (!fileSystem.exists(dataDirPath) ||
!fileSystem.isDirectory(dataDirPath)) {
+ if (!pinotFS.exists(dataDirURI) || !pinotFS.isDirectory(dataDirURI)) {
throw new RuntimeException("Data directory " + _dataDir + " not found.");
}
// Gather all data files
- List<Path> dataFilePaths = getDataFilePaths(dataDirPath);
+ String[] dataFilePaths = pinotFS.listFiles(dataDirURI, true);
- if ((dataFilePaths == null) || (dataFilePaths.size() == 0)) {
+ if ((dataFilePaths == null) || (dataFilePaths.length == 0)) {
throw new RuntimeException(
"Data directory " + _dataDir + " does not contain " +
_format.toString().toUpperCase() + " files.");
}
- LOGGER.info("Accepted files: {}",
Arrays.toString(dataFilePaths.toArray()));
+ LOGGER.info("Accepted files: {}", Arrays.toString(dataFilePaths));
// Make sure output directory does not already exist, or can be
overwritten.
File outDir = new File(_outDir);
@@ -325,20 +325,25 @@ public class CreateSegmentCommand extends
AbstractBaseAdminCommand implements Co
ExecutorService executor = Executors.newFixedThreadPool(_numThreads);
int cnt = 0;
- for (final Path dataFilePath : dataFilePaths) {
+ for (final String dataFilePath : dataFilePaths) {
final int segCnt = cnt;
executor.execute(new Runnable() {
@Override
public void run() {
for (int curr = 0; curr <= _retry; curr++) {
+ File localDir = new File(UUID.randomUUID().toString());
try {
SegmentGeneratorConfig config = new
SegmentGeneratorConfig(segmentGeneratorConfig);
-
- String localFile = dataFilePath.getName();
- Path localFilePath = new Path(localFile);
- dataDirPath.getFileSystem(new
Configuration()).copyToLocalFile(dataFilePath, localFilePath);
- config.setInputFilePath(localFile);
+ URI dataFileUri = URI.create(dataFilePath);
+ String[] splits = dataFilePath.split("/");
+ String fileName = splits[splits.length - 1];
+ if (!isDataFile(fileName)) {
+ return;
+ }
+ File localFile = new File(localDir, fileName);
+ pinotFS.copyToLocalFile(dataFileUri, localFile);
+ config.setInputFilePath(localFile.getAbsolutePath());
config.setSegmentName(_segmentName + "_" + segCnt);
Schema schema = Schema.fromFile(new File(_schemaFile));
config.setSchema(schema);
@@ -359,7 +364,7 @@ public class CreateSegmentCommand extends
AbstractBaseAdminCommand implements Co
if (_readerConfigFile != null) {
readerConfig = JsonUtils.fileToObject(new
File(_readerConfigFile), CSVRecordReaderConfig.class);
}
- csvRecordReader.init(new File(localFile), schema,
readerConfig);
+ csvRecordReader.init(localFile, schema, readerConfig);
driver.init(config, csvRecordReader);
break;
default:
@@ -381,6 +386,8 @@ public class CreateSegmentCommand extends
AbstractBaseAdminCommand implements Co
} else {
LOGGER.error("Failed to create Pinot segment, retry: {}/{}",
curr + 1, _retry);
}
+ } finally {
+ FileUtils.deleteQuietly(localDir);
}
}
}
@@ -398,9 +405,10 @@ public class CreateSegmentCommand extends
AbstractBaseAdminCommand implements Co
try {
try {
localTempDir.getParentFile().mkdirs();
- FileSystem.get(URI.create(indexDir.toString()), new Configuration())
- .copyToLocalFile(new Path(indexDir.toString()), new
Path(localTempDir.toString()));
- } catch (IOException e) {
+ URI indexDirUri = URI.create(indexDir.toString());
+ PinotFS pinotFs = PinotFSFactory.create(indexDirUri.getScheme());
+ pinotFs.copyToLocalFile(indexDirUri, localTempDir);
+ } catch (Exception e) {
LOGGER.error("Failed to copy segment {} to local directory {} for
verification.", indexDir, localTempDir, e);
return false;
}
@@ -419,28 +427,6 @@ public class CreateSegmentCommand extends
AbstractBaseAdminCommand implements Co
}
}
- protected List<Path> getDataFilePaths(Path pathPattern)
- throws IOException {
- List<Path> tarFilePaths = new ArrayList<>();
- FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), new
Configuration());
- getDataFilePathsHelper(fileSystem, fileSystem.globStatus(pathPattern),
tarFilePaths);
- return tarFilePaths;
- }
-
- protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[]
fileStatuses, List<Path> tarFilePaths)
- throws IOException {
- for (FileStatus fileStatus : fileStatuses) {
- Path path = fileStatus.getPath();
- if (fileStatus.isDirectory()) {
- getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path),
tarFilePaths);
- } else {
- if (isDataFile(path.getName())) {
- tarFilePaths.add(path);
- }
- }
- }
- }
-
protected boolean isDataFile(String fileName) {
switch (_format) {
case AVRO:
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEvent.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEvent.java
index 2292c04..c9a069e 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEvent.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEvent.java
@@ -18,13 +18,12 @@
*/
package org.apache.pinot.tools.streams.githubevents;
+import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.joda.time.format.DateTimeFormatter;
@@ -39,43 +38,43 @@ public class PullRequestMergedEvent {
private static final DateTimeFormatter DATE_FORMATTER =
ISODateTimeFormat.dateTimeNoMillis();
// dimensions
- private String _title;
- private List<String> _labels;
- private String _userId;
- private String _userType;
- private String _authorAssociation;
- private String _mergedBy;
- private List<String> _assignees;
- private List<String> _committers;
- private List<String> _authors;
- private List<String> _reviewers;
- private List<String> _commenters;
- private List<String> _requestedReviewers;
- private List<String> _requestedTeams;
- private String _repo;
- private String _organization;
+ private final String _title;
+ private final List<String> _labels;
+ private final String _userId;
+ private final String _userType;
+ private final String _authorAssociation;
+ private final String _mergedBy;
+ private final List<String> _assignees;
+ private final List<String> _committers;
+ private final List<String> _authors;
+ private final List<String> _reviewers;
+ private final List<String> _commenters;
+ private final List<String> _requestedReviewers;
+ private final List<String> _requestedTeams;
+ private final String _repo;
+ private final String _organization;
// metrics
- private long _numComments;
- private long _numReviewComments;
- private long _numCommits;
- private long _numLinesAdded;
- private long _numLinesDeleted;
- private long _numFilesChanged;
- private long _numReviewers;
- private long _numCommenters;
- private long _numCommitters;
- private long _numAuthors;
- private long _createdTimeMillis;
- private long _elapsedTimeMillis;
+ private final long _numComments;
+ private final long _numReviewComments;
+ private final long _numCommits;
+ private final long _numLinesAdded;
+ private final long _numLinesDeleted;
+ private final long _numFilesChanged;
+ private final long _numReviewers;
+ private final long _numCommenters;
+ private final long _numCommitters;
+ private final long _numAuthors;
+ private final long _createdTimeMillis;
+ private final long _elapsedTimeMillis;
// time
- private long _mergedTimeMillis;
+ private final long _mergedTimeMillis;
- private JsonObject _pullRequest;
- private JsonArray _commitsArray;
- private JsonArray _reviewCommentsArray;
- private JsonArray _commentsArray;
+ private final JsonNode _pullRequest;
+ private final JsonNode _commitsArray;
+ private final JsonNode _reviewCommentsArray;
+ private final JsonNode _commentsArray;
/**
* Construct a PullRequestMergedEvent from the github event of type
PullRequestEvent which is also merged and closed.
@@ -85,23 +84,23 @@ public class PullRequestMergedEvent {
* @param reviewComments - review comments data corresponding to the event
* @param comments - comments data corresponding to the event
*/
- public PullRequestMergedEvent(JsonObject event, JsonArray commits, JsonArray
reviewComments, JsonArray comments) {
+ public PullRequestMergedEvent(JsonNode event, JsonNode commits, JsonNode
reviewComments, JsonNode comments) {
- JsonObject payload = event.get("payload").getAsJsonObject();
- _pullRequest = payload.get("pull_request").getAsJsonObject();
+ JsonNode payload = event.get("payload");
+ _pullRequest = payload.get("pull_request");
_commitsArray = commits;
_reviewCommentsArray = reviewComments;
_commentsArray = comments;
// Dimensions
- _title = _pullRequest.get("title").getAsString();
+ _title = _pullRequest.get("title").asText();
_labels = extractLabels();
- JsonObject user = _pullRequest.get("user").getAsJsonObject();
- _userId = user.get("login").getAsString();
- _userType = user.get("type").getAsString();
- _authorAssociation = _pullRequest.get("author_association").getAsString();
- JsonObject mergedBy = _pullRequest.get("merged_by").getAsJsonObject();
- _mergedBy = mergedBy.get("login").getAsString();
+ JsonNode user = _pullRequest.get("user");
+ _userId = user.get("login").asText();
+ _userType = user.get("type").asText();
+ _authorAssociation = _pullRequest.get("author_association").asText();
+ JsonNode mergedBy = _pullRequest.get("merged_by");
+ _mergedBy = mergedBy.get("login").asText();
_assignees = extractAssignees();
_committers = Lists.newArrayList(extractCommitters());
_authors = Lists.newArrayList(extractAuthors());
@@ -109,26 +108,26 @@ public class PullRequestMergedEvent {
_commenters = Lists.newArrayList(extractCommenters());
_requestedReviewers = extractRequestedReviewers();
_requestedTeams = extractRequestedTeams();
- JsonObject repo = event.get("repo").getAsJsonObject();
- String[] repoName = repo.get("name").getAsString().split("/");
+ JsonNode repo = event.get("repo");
+ String[] repoName = repo.get("name").asText().split("/");
_repo = repoName[1];
_organization = repoName[0];
// Metrics
- _numComments = _pullRequest.get("comments").getAsInt();
- _numReviewComments = _pullRequest.get("review_comments").getAsInt();
- _numCommits = _pullRequest.get("commits").getAsInt();
- _numLinesAdded = _pullRequest.get("additions").getAsInt();
- _numLinesDeleted = _pullRequest.get("deletions").getAsInt();
- _numFilesChanged = _pullRequest.get("changed_files").getAsInt();
+ _numComments = _pullRequest.get("comments").asInt();
+ _numReviewComments = _pullRequest.get("review_comments").asInt();
+ _numCommits = _pullRequest.get("commits").asInt();
+ _numLinesAdded = _pullRequest.get("additions").asInt();
+ _numLinesDeleted = _pullRequest.get("deletions").asInt();
+ _numFilesChanged = _pullRequest.get("changed_files").asInt();
_numReviewers = _reviewers.size();
_numCommenters = _commenters.size();
_numCommitters = _committers.size();
_numAuthors = _authors.size();
// Time
- _createdTimeMillis =
DATE_FORMATTER.parseMillis(_pullRequest.get("created_at").getAsString());
- _mergedTimeMillis =
DATE_FORMATTER.parseMillis(_pullRequest.get("merged_at").getAsString());
+ _createdTimeMillis =
DATE_FORMATTER.parseMillis(_pullRequest.get("created_at").asText());
+ _mergedTimeMillis =
DATE_FORMATTER.parseMillis(_pullRequest.get("merged_at").asText());
_elapsedTimeMillis = _mergedTimeMillis - _createdTimeMillis;
}
@@ -137,10 +136,10 @@ public class PullRequestMergedEvent {
*/
private Set<String> extractReviewers() {
Set<String> reviewers;
- if (_reviewCommentsArray != null && !_reviewCommentsArray.isJsonNull() &
_reviewCommentsArray.size() > 0) {
+ if (_reviewCommentsArray != null && _reviewCommentsArray.size() > 0) {
reviewers = new HashSet<>();
- for (JsonElement reviewComment : _reviewCommentsArray) {
-
reviewers.add(reviewComment.getAsJsonObject().get("user").getAsJsonObject().get("login").getAsString());
+ for (JsonNode reviewComment : _reviewCommentsArray) {
+ reviewers.add(reviewComment.get("user").get("login").asText());
}
} else {
reviewers = Collections.emptySet();
@@ -153,10 +152,10 @@ public class PullRequestMergedEvent {
*/
private Set<String> extractCommenters() {
Set<String> commenters;
- if (_commentsArray != null && !_commentsArray.isJsonNull() &
_commentsArray.size() > 0) {
+ if (_commentsArray != null && _commentsArray.size() > 0) {
commenters = new HashSet<>();
- for (JsonElement comment : _commentsArray) {
-
commenters.add(comment.getAsJsonObject().get("user").getAsJsonObject().get("login").getAsString());
+ for (JsonNode comment : _commentsArray) {
+ commenters.add(comment.get("user").get("login").asText());
}
} else {
commenters = Collections.emptySet();
@@ -169,17 +168,15 @@ public class PullRequestMergedEvent {
*/
private Set<String> extractCommitters() {
Set<String> committers;
- if (_commitsArray != null && !_commitsArray.isJsonNull() &
_commitsArray.size() > 0) {
+ if (_commitsArray != null && _commitsArray.size() > 0) {
committers = new HashSet<>();
- for (JsonElement commit : _commitsArray) {
- JsonObject commitAsJsonObject = commit.getAsJsonObject();
- JsonElement committer = commitAsJsonObject.get("committer");
- if (committer.isJsonNull()) {
- committers.add(
-
commitAsJsonObject.get("commit").getAsJsonObject().get("committer").getAsJsonObject().get("name")
- .getAsString());
+ for (JsonNode commit : _commitsArray) {
+ JsonNode commitAsJsonNode = commit;
+ JsonNode committer = commitAsJsonNode.get("committer");
+ if (committer.size() == 0) {
+
committers.add(commitAsJsonNode.get("commit").get("committer").get("name").asText());
} else {
-
committers.add(committer.getAsJsonObject().get("login").getAsString());
+ committers.add(committer.get("login").asText());
}
}
} else {
@@ -193,16 +190,15 @@ public class PullRequestMergedEvent {
*/
private Set<String> extractAuthors() {
Set<String> authors;
- if (_commitsArray != null && !_commitsArray.isJsonNull() &
_commitsArray.size() > 0) {
+ if (_commitsArray != null && _commitsArray.size() > 0) {
authors = new HashSet<>();
- for (JsonElement commit : _commitsArray) {
- JsonObject commitAsJsonObject = commit.getAsJsonObject();
- JsonElement author = commitAsJsonObject.get("author");
- if (author.isJsonNull()) {
-
authors.add(commitAsJsonObject.get("commit").getAsJsonObject().get("author").getAsJsonObject().get("name")
- .getAsString());
+ for (JsonNode commit : _commitsArray) {
+ JsonNode commitAsJsonNode = commit;
+ JsonNode author = commitAsJsonNode.get("author");
+ if (author.size() == 0) {
+
authors.add(commitAsJsonNode.get("commit").get("author").get("name").asText());
} else {
- authors.add(author.getAsJsonObject().get("login").getAsString());
+ authors.add(author.get("login").asText());
}
}
} else {
@@ -215,16 +211,10 @@ public class PullRequestMergedEvent {
* Extracts labels for the PR
*/
private List<String> extractLabels() {
- JsonArray labelsArray = _pullRequest.get("labels").getAsJsonArray();
- List<String> labels;
- int size = labelsArray.size();
- if (size > 0) {
- labels = new ArrayList<>(size);
- for (JsonElement label : labelsArray) {
- labels.add(label.getAsJsonObject().get("name").getAsString());
- }
- } else {
- labels = Collections.emptyList();
+ Iterator<JsonNode> labelsIterator = _pullRequest.get("labels").elements();
+ List<String> labels = new ArrayList<>();
+ while (labelsIterator.hasNext()) {
+ labels.add(labelsIterator.next().get("name").asText());
}
return labels;
}
@@ -233,16 +223,10 @@ public class PullRequestMergedEvent {
* Extracts assignees for the PR
*/
private List<String> extractAssignees() {
- JsonArray assigneesJson = _pullRequest.get("assignees").getAsJsonArray();
- int numAssignees = assigneesJson.size();
- List<String> assignees;
- if (numAssignees > 0) {
- assignees = new ArrayList<>(numAssignees);
- for (JsonElement reviewer : assigneesJson) {
- assignees.add(reviewer.getAsJsonObject().get("login").getAsString());
- }
- } else {
- assignees = Collections.emptyList();
+ Iterator<JsonNode> assigneesIterator =
_pullRequest.get("assignees").elements();
+ List<String> assignees = new ArrayList<>();
+ while (assigneesIterator.hasNext()) {
+ assignees.add(assigneesIterator.next().get("login").asText());
}
return assignees;
}
@@ -251,16 +235,10 @@ public class PullRequestMergedEvent {
* Extracts list of requested reviewers
*/
private List<String> extractRequestedReviewers() {
- JsonArray requestedReviewersJson =
_pullRequest.get("requested_reviewers").getAsJsonArray();
- int numRequestedReviewers = requestedReviewersJson.size();
- List<String> requestedReviewers;
- if (numRequestedReviewers > 0) {
- requestedReviewers = new ArrayList<>(numRequestedReviewers);
- for (JsonElement reviewer : requestedReviewersJson) {
-
requestedReviewers.add(reviewer.getAsJsonObject().get("login").getAsString());
- }
- } else {
- requestedReviewers = Collections.emptyList();
+ Iterator<JsonNode> requestedReviewersIterator =
_pullRequest.get("requested_reviewers").elements();
+ List<String> requestedReviewers = new ArrayList<>();
+ while (requestedReviewersIterator.hasNext()) {
+
requestedReviewers.add(requestedReviewersIterator.next().get("login").asText());
}
return requestedReviewers;
}
@@ -269,16 +247,11 @@ public class PullRequestMergedEvent {
* Extracts list of review requested teams
*/
private List<String> extractRequestedTeams() {
- JsonArray requestedTeamsJson =
_pullRequest.get("requested_teams").getAsJsonArray();
- int numRequestedTeams = requestedTeamsJson.size();
- List<String> requestedTeams;
- if (numRequestedTeams > 0) {
- requestedTeams = new ArrayList<>(numRequestedTeams);
- for (JsonElement team : requestedTeamsJson) {
- requestedTeams.add(team.getAsJsonObject().get("name").getAsString());
- }
- } else {
- requestedTeams = Collections.emptyList();
+
+ Iterator<JsonNode> requestedTeamsIterator =
_pullRequest.get("requested_teams").elements();
+ List<String> requestedTeams = new ArrayList<>();
+ while (requestedTeamsIterator.hasNext()) {
+ requestedTeams.add(requestedTeamsIterator.next().get("name").asText());
}
return requestedTeams;
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
index d9866b4..c9d8f5f 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
@@ -18,11 +18,8 @@
*/
package org.apache.pinot.tools.streams.githubevents;
+import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
import java.io.File;
import java.io.IOException;
import java.net.URL;
@@ -36,6 +33,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.Quickstart;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.slf4j.Logger;
@@ -54,12 +52,12 @@ public class PullRequestMergedEventsStream {
private static final Logger LOGGER =
LoggerFactory.getLogger(PullRequestMergedEventsStream.class);
private static final long SLEEP_MILLIS = 10_000;
- private ExecutorService _service;
+ private final ExecutorService _service;
private boolean _keepStreaming = true;
- private Schema _avroSchema;
- private String _topicName;
- private GitHubAPICaller _gitHubAPICaller;
+ private final Schema _avroSchema;
+ private final String _topicName;
+ private final GitHubAPICaller _gitHubAPICaller;
private StreamDataProducer _producer;
@@ -93,6 +91,17 @@ public class PullRequestMergedEventsStream {
_producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
properties);
}
+ public static void main(String[] args)
+ throws Exception {
+ String personalAccessToken = args[0];
+ String schemaFile = args[1];
+ String topic = "pullRequestMergedEvent";
+ PullRequestMergedEventsStream stream =
+ new PullRequestMergedEventsStream(schemaFile, topic,
KafkaStarterUtils.DEFAULT_KAFKA_BROKER,
+ personalAccessToken);
+ stream.execute();
+ }
+
/**
* Starts the stream.
* Adds shutdown hook.
@@ -150,9 +159,8 @@ public class PullRequestMergedEventsStream {
switch (githubAPIResponse.statusCode) {
case 200: // Read new events
etag = githubAPIResponse.etag;
- JsonArray jsonArray = new
JsonParser().parse(githubAPIResponse.responseString).getAsJsonArray();
-
- for (JsonElement eventElement : jsonArray) {
+ JsonNode jsonArray =
JsonUtils.stringToJsonNode(githubAPIResponse.responseString);
+ for (JsonNode eventElement : jsonArray) {
try {
GenericRecord genericRecord =
convertToPullRequestMergedGenericRecord(eventElement);
if (genericRecord != null) {
@@ -206,41 +214,41 @@ public class PullRequestMergedEventsStream {
* Find commits, review comments, comments corresponding to this pull
request event.
* Construct a PullRequestMergedEvent with the help of the event, commits,
review comments and comments.
* Converts PullRequestMergedEvent to GenericRecord
+ * @param event
*/
- private GenericRecord convertToPullRequestMergedGenericRecord(JsonElement
eventJson)
+ private GenericRecord convertToPullRequestMergedGenericRecord(JsonNode event)
throws IOException {
GenericRecord genericRecord = null;
- JsonObject event = eventJson.getAsJsonObject();
- String type = event.get("type").getAsString();
+ String type = event.get("type").asText();
if ("PullRequestEvent".equals(type)) {
- JsonObject payload = event.get("payload").getAsJsonObject();
+ JsonNode payload = event.get("payload");
if (payload != null) {
- String action = payload.get("action").getAsString();
- JsonObject pullRequest = payload.get("pull_request").getAsJsonObject();
- String merged = pullRequest.get("merged").getAsString();
+ String action = payload.get("action").asText();
+ JsonNode pullRequest = payload.get("pull_request");
+ String merged = pullRequest.get("merged").asText();
if ("closed".equals(action) && "true".equals(merged)) { // valid pull
request merge event
- JsonArray commits = null;
- String commitsURL = pullRequest.get("commits_url").getAsString();
+ JsonNode commits = null;
+ String commitsURL = pullRequest.get("commits_url").asText();
GitHubAPICaller.GitHubAPIResponse commitsResponse =
_gitHubAPICaller.callAPI(commitsURL);
if (commitsResponse.responseString != null) {
- commits = new
JsonParser().parse(commitsResponse.responseString).getAsJsonArray();
+ commits =
JsonUtils.stringToJsonNode(commitsResponse.responseString);
}
- JsonArray reviewComments = null;
- String reviewCommentsURL =
pullRequest.get("review_comments_url").getAsString();
+ JsonNode reviewComments = null;
+ String reviewCommentsURL =
pullRequest.get("review_comments_url").asText();
GitHubAPICaller.GitHubAPIResponse reviewCommentsResponse =
_gitHubAPICaller.callAPI(reviewCommentsURL);
if (reviewCommentsResponse.responseString != null) {
- reviewComments = new
JsonParser().parse(reviewCommentsResponse.responseString).getAsJsonArray();
+ reviewComments =
JsonUtils.stringToJsonNode(reviewCommentsResponse.responseString);
}
- JsonArray comments = null;
- String commentsURL = pullRequest.get("comments_url").getAsString();
+ JsonNode comments = null;
+ String commentsURL = pullRequest.get("comments_url").asText();
GitHubAPICaller.GitHubAPIResponse commentsResponse =
_gitHubAPICaller.callAPI(commentsURL);
if (commentsResponse.responseString != null) {
- comments = new
JsonParser().parse(commentsResponse.responseString).getAsJsonArray();
+ comments =
JsonUtils.stringToJsonNode(commentsResponse.responseString);
}
// get PullRequestMergeEvent
@@ -296,15 +304,4 @@ public class PullRequestMergedEventsStream {
return genericRecord;
}
-
- public static void main(String[] args)
- throws Exception {
- String personalAccessToken = args[0];
- String schemaFile = args[1];
- String topic = "pullRequestMergedEvent";
- PullRequestMergedEventsStream stream =
- new PullRequestMergedEventsStream(schemaFile, topic,
KafkaStarterUtils.DEFAULT_KAFKA_BROKER,
- personalAccessToken);
- stream.execute();
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]