This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4ed36ff Remove hadoop dependency in Create Segment Command (#5271) 4ed36ff is described below commit 4ed36ff6653c53a8780e9a9d68b370561c251f68 Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Tue Apr 21 00:15:33 2020 -0700 Remove hadoop dependency in Create Segment Command (#5271) * Remove hadoop dependency in Create Segment Command * move gson to fasterxml for github events quickstart * Address comments --- .../pinot/spi/filesystem/PinotFSFactory.java | 11 +- pinot-tools/pom.xml | 5 - .../tools/admin/command/CreateSegmentCommand.java | 74 +++----- .../githubevents/PullRequestMergedEvent.java | 209 +++++++++------------ .../PullRequestMergedEventsStream.java | 73 ++++--- 5 files changed, 161 insertions(+), 211 deletions(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java index 38d0eff..97c1fd0 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java @@ -39,8 +39,12 @@ public class PinotFSFactory { private static final Logger LOGGER = LoggerFactory.getLogger(PinotFSFactory.class); private static final String LOCAL_PINOT_FS_SCHEME = "file"; private static final String CLASS = "class"; + private static final Map<String, PinotFS> PINOT_FS_MAP = new HashMap<String, PinotFS>() { + { + put(LOCAL_PINOT_FS_SCHEME, new LocalPinotFS()); + } + }; - private static Map<String, PinotFS> PINOT_FS_MAP = new HashMap<>(); public static void register(String scheme, String fsClassName, Configuration configuration) { try { @@ -66,11 +70,6 @@ public class PinotFSFactory { LOGGER.info("Got scheme {}, classname {}, starting to initialize", key, fsClassName); register(key, fsClassName, fsConfig.subset(key)); } - - if (!PINOT_FS_MAP.containsKey(LOCAL_PINOT_FS_SCHEME)) { - LOGGER.info("LocalPinotFS not configured, adding as default"); - PINOT_FS_MAP.put(LOCAL_PINOT_FS_SCHEME, new LocalPinotFS()); - } } public static PinotFS create(String scheme) { diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml index 2fbc384..0f3310f 100644 --- a/pinot-tools/pom.xml +++ b/pinot-tools/pom.xml @@ -83,11 +83,6 @@ <scope>runtime</scope> </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>compile</scope> - </dependency> - <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </dependency> diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java index 6041dc8..8424c82 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java @@ -21,17 +21,12 @@ package org.apache.pinot.tools.admin.command; import java.io.File; import java.io.IOException; import java.net.URI; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.pinot.common.segment.ReadMode; import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment; @@ -42,6 +37,8 @@ import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.tools.Command; import org.kohsuke.args4j.Option; @@ -269,22 +266,25 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co } // Filter out all input files. - final Path dataDirPath = new Path(_dataDir); - FileSystem fileSystem = FileSystem.get(URI.create(_dataDir), new Configuration()); + URI dataDirURI = URI.create(_dataDir); + if (dataDirURI.getScheme() == null) { + dataDirURI = new File(_dataDir).toURI(); + } + PinotFS pinotFS = PinotFSFactory.create(dataDirURI.getScheme()); - if (!fileSystem.exists(dataDirPath) || !fileSystem.isDirectory(dataDirPath)) { + if (!pinotFS.exists(dataDirURI) || !pinotFS.isDirectory(dataDirURI)) { throw new RuntimeException("Data directory " + _dataDir + " not found."); } // Gather all data files - List<Path> dataFilePaths = getDataFilePaths(dataDirPath); + String[] dataFilePaths = pinotFS.listFiles(dataDirURI, true); - if ((dataFilePaths == null) || (dataFilePaths.size() == 0)) { + if ((dataFilePaths == null) || (dataFilePaths.length == 0)) { throw new RuntimeException( "Data directory " + _dataDir + " does not contain " + _format.toString().toUpperCase() + " files."); } - LOGGER.info("Accepted files: {}", Arrays.toString(dataFilePaths.toArray())); + LOGGER.info("Accepted files: {}", Arrays.toString(dataFilePaths)); // Make sure output directory does not already exist, or can be overwritten. File outDir = new File(_outDir); @@ -325,20 +325,25 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co ExecutorService executor = Executors.newFixedThreadPool(_numThreads); int cnt = 0; - for (final Path dataFilePath : dataFilePaths) { + for (final String dataFilePath : dataFilePaths) { final int segCnt = cnt; executor.execute(new Runnable() { @Override public void run() { for (int curr = 0; curr <= _retry; curr++) { + File localDir = new File(UUID.randomUUID().toString()); try { SegmentGeneratorConfig config = new SegmentGeneratorConfig(segmentGeneratorConfig); - - String localFile = dataFilePath.getName(); - Path localFilePath = new Path(localFile); - dataDirPath.getFileSystem(new Configuration()).copyToLocalFile(dataFilePath, localFilePath); - config.setInputFilePath(localFile); + URI dataFileUri = URI.create(dataFilePath); + String[] splits = dataFilePath.split("/"); + String fileName = splits[splits.length - 1]; + if (!isDataFile(fileName)) { + return; + } + File localFile = new File(localDir, fileName); + pinotFS.copyToLocalFile(dataFileUri, localFile); + config.setInputFilePath(localFile.getAbsolutePath()); config.setSegmentName(_segmentName + "_" + segCnt); Schema schema = Schema.fromFile(new File(_schemaFile)); config.setSchema(schema); @@ -359,7 +364,7 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co if (_readerConfigFile != null) { readerConfig = JsonUtils.fileToObject(new File(_readerConfigFile), CSVRecordReaderConfig.class); } - csvRecordReader.init(new File(localFile), schema, readerConfig); + csvRecordReader.init(localFile, schema, readerConfig); driver.init(config, csvRecordReader); break; default: @@ -381,6 +386,8 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co } else { LOGGER.error("Failed to create Pinot segment, retry: {}/{}", curr + 1, _retry); } + } finally { + FileUtils.deleteQuietly(localDir); } } } @@ -398,9 +405,10 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co try { try { localTempDir.getParentFile().mkdirs(); - FileSystem.get(URI.create(indexDir.toString()), new Configuration()) - .copyToLocalFile(new Path(indexDir.toString()), new Path(localTempDir.toString())); - } catch (IOException e) { + URI indexDirUri = URI.create(indexDir.toString()); + PinotFS pinotFs = PinotFSFactory.create(indexDirUri.getScheme()); + pinotFs.copyToLocalFile(indexDirUri, localTempDir); + } catch (Exception e) { LOGGER.error("Failed to copy segment {} to local directory {} for verification.", indexDir, localTempDir, e); return false; } @@ -419,28 +427,6 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co } } - protected List<Path> getDataFilePaths(Path pathPattern) - throws IOException { - List<Path> tarFilePaths = new ArrayList<>(); - FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), new Configuration()); - getDataFilePathsHelper(fileSystem, fileSystem.globStatus(pathPattern), tarFilePaths); - return tarFilePaths; - } - - protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[] fileStatuses, List<Path> tarFilePaths) - throws IOException { - for (FileStatus fileStatus : fileStatuses) { - Path path = fileStatus.getPath(); - if (fileStatus.isDirectory()) { - getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), tarFilePaths); - } else { - if (isDataFile(path.getName())) { - tarFilePaths.add(path); - } - } - } - } - protected boolean isDataFile(String fileName) { switch (_format) { case AVRO: diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEvent.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEvent.java index 2292c04..c9a069e 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEvent.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEvent.java @@ -18,13 +18,12 @@ */ package org.apache.pinot.tools.streams.githubevents; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; import org.joda.time.format.DateTimeFormatter; @@ -39,43 +38,43 @@ public class PullRequestMergedEvent { private static final DateTimeFormatter DATE_FORMATTER = ISODateTimeFormat.dateTimeNoMillis(); // dimensions - private String _title; - private List<String> _labels; - private String _userId; - private String _userType; - private String _authorAssociation; - private String _mergedBy; - private List<String> _assignees; - private List<String> _committers; - private List<String> _authors; - private List<String> _reviewers; - private List<String> _commenters; - private List<String> _requestedReviewers; - private List<String> _requestedTeams; - private String _repo; - private String _organization; + private final String _title; + private final List<String> _labels; + private final String _userId; + private final String _userType; + private final String _authorAssociation; + private final String _mergedBy; + private final List<String> _assignees; + private final List<String> _committers; + private final List<String> _authors; + private final List<String> _reviewers; + private final List<String> _commenters; + private final List<String> _requestedReviewers; + private final List<String> _requestedTeams; + private final String _repo; + private final String _organization; // metrics - private long _numComments; - private long _numReviewComments; - private long _numCommits; - private long _numLinesAdded; - private long _numLinesDeleted; - private long _numFilesChanged; - private long _numReviewers; - private long _numCommenters; - private long _numCommitters; - private long _numAuthors; - private long _createdTimeMillis; - private long _elapsedTimeMillis; + private final long _numComments; + private final long _numReviewComments; + private final long _numCommits; + private final long _numLinesAdded; + private final long _numLinesDeleted; + private final long _numFilesChanged; + private final long _numReviewers; + private final long _numCommenters; + private final long _numCommitters; + private final long _numAuthors; + private final long _createdTimeMillis; + private final long _elapsedTimeMillis; // time - private long _mergedTimeMillis; + private final long _mergedTimeMillis; - private JsonObject _pullRequest; - private JsonArray _commitsArray; - private JsonArray _reviewCommentsArray; - private JsonArray _commentsArray; + private final JsonNode _pullRequest; + private final JsonNode _commitsArray; + private final JsonNode _reviewCommentsArray; + private final JsonNode _commentsArray; /** * Construct a PullRequestMergedEvent from the github event of type PullRequestEvent which is also merged and closed. @@ -85,23 +84,23 @@ public class PullRequestMergedEvent { * @param reviewComments - review comments data corresponding to the event * @param comments - comments data corresponding to the event */ - public PullRequestMergedEvent(JsonObject event, JsonArray commits, JsonArray reviewComments, JsonArray comments) { + public PullRequestMergedEvent(JsonNode event, JsonNode commits, JsonNode reviewComments, JsonNode comments) { - JsonObject payload = event.get("payload").getAsJsonObject(); - _pullRequest = payload.get("pull_request").getAsJsonObject(); + JsonNode payload = event.get("payload"); + _pullRequest = payload.get("pull_request"); _commitsArray = commits; _reviewCommentsArray = reviewComments; _commentsArray = comments; // Dimensions - _title = _pullRequest.get("title").getAsString(); + _title = _pullRequest.get("title").asText(); _labels = extractLabels(); - JsonObject user = _pullRequest.get("user").getAsJsonObject(); - _userId = user.get("login").getAsString(); - _userType = user.get("type").getAsString(); - _authorAssociation = _pullRequest.get("author_association").getAsString(); - JsonObject mergedBy = _pullRequest.get("merged_by").getAsJsonObject(); - _mergedBy = mergedBy.get("login").getAsString(); + JsonNode user = _pullRequest.get("user"); + _userId = user.get("login").asText(); + _userType = user.get("type").asText(); + _authorAssociation = _pullRequest.get("author_association").asText(); + JsonNode mergedBy = _pullRequest.get("merged_by"); + _mergedBy = mergedBy.get("login").asText(); _assignees = extractAssignees(); _committers = Lists.newArrayList(extractCommitters()); _authors = Lists.newArrayList(extractAuthors()); @@ -109,26 +108,26 @@ public class PullRequestMergedEvent { _commenters = Lists.newArrayList(extractCommenters()); _requestedReviewers = extractRequestedReviewers(); _requestedTeams = extractRequestedTeams(); - JsonObject repo = event.get("repo").getAsJsonObject(); - String[] repoName = repo.get("name").getAsString().split("/"); + JsonNode repo = event.get("repo"); + String[] repoName = repo.get("name").asText().split("/"); _repo = repoName[1]; _organization = repoName[0]; // Metrics - _numComments = _pullRequest.get("comments").getAsInt(); - _numReviewComments = _pullRequest.get("review_comments").getAsInt(); - _numCommits = _pullRequest.get("commits").getAsInt(); - _numLinesAdded = _pullRequest.get("additions").getAsInt(); - _numLinesDeleted = _pullRequest.get("deletions").getAsInt(); - _numFilesChanged = _pullRequest.get("changed_files").getAsInt(); + _numComments = _pullRequest.get("comments").asInt(); + _numReviewComments = _pullRequest.get("review_comments").asInt(); + _numCommits = _pullRequest.get("commits").asInt(); + _numLinesAdded = _pullRequest.get("additions").asInt(); + _numLinesDeleted = _pullRequest.get("deletions").asInt(); + _numFilesChanged = _pullRequest.get("changed_files").asInt(); _numReviewers = _reviewers.size(); _numCommenters = _commenters.size(); _numCommitters = _committers.size(); _numAuthors = _authors.size(); // Time - _createdTimeMillis = DATE_FORMATTER.parseMillis(_pullRequest.get("created_at").getAsString()); - _mergedTimeMillis = DATE_FORMATTER.parseMillis(_pullRequest.get("merged_at").getAsString()); + _createdTimeMillis = DATE_FORMATTER.parseMillis(_pullRequest.get("created_at").asText()); + _mergedTimeMillis = DATE_FORMATTER.parseMillis(_pullRequest.get("merged_at").asText()); _elapsedTimeMillis = _mergedTimeMillis - _createdTimeMillis; } @@ -137,10 +136,10 @@ public class PullRequestMergedEvent { */ private Set<String> extractReviewers() { Set<String> reviewers; - if (_reviewCommentsArray != null && !_reviewCommentsArray.isJsonNull() & _reviewCommentsArray.size() > 0) { + if (_reviewCommentsArray != null && _reviewCommentsArray.size() > 0) { reviewers = new HashSet<>(); - for (JsonElement reviewComment : _reviewCommentsArray) { - reviewers.add(reviewComment.getAsJsonObject().get("user").getAsJsonObject().get("login").getAsString()); + for (JsonNode reviewComment : _reviewCommentsArray) { + reviewers.add(reviewComment.get("user").get("login").asText()); } } else { reviewers = Collections.emptySet(); @@ -153,10 +152,10 @@ public class PullRequestMergedEvent { */ private Set<String> extractCommenters() { Set<String> commenters; - if (_commentsArray != null && !_commentsArray.isJsonNull() & _commentsArray.size() > 0) { + if (_commentsArray != null && _commentsArray.size() > 0) { commenters = new HashSet<>(); - for (JsonElement comment : _commentsArray) { - commenters.add(comment.getAsJsonObject().get("user").getAsJsonObject().get("login").getAsString()); + for (JsonNode comment : _commentsArray) { + commenters.add(comment.get("user").get("login").asText()); } } else { commenters = Collections.emptySet(); @@ -169,17 +168,15 @@ public class PullRequestMergedEvent { */ private Set<String> extractCommitters() { Set<String> committers; - if (_commitsArray != null && !_commitsArray.isJsonNull() & _commitsArray.size() > 0) { + if (_commitsArray != null && _commitsArray.size() > 0) { committers = new HashSet<>(); - for (JsonElement commit : _commitsArray) { - JsonObject commitAsJsonObject = commit.getAsJsonObject(); - JsonElement committer = commitAsJsonObject.get("committer"); - if (committer.isJsonNull()) { - committers.add( - commitAsJsonObject.get("commit").getAsJsonObject().get("committer").getAsJsonObject().get("name") - .getAsString()); + for (JsonNode commit : _commitsArray) { + JsonNode commitAsJsonNode = commit; + JsonNode committer = commitAsJsonNode.get("committer"); + if (committer.size() == 0) { + committers.add(commitAsJsonNode.get("commit").get("committer").get("name").asText()); } else { - committers.add(committer.getAsJsonObject().get("login").getAsString()); + committers.add(committer.get("login").asText()); } } } else { @@ -193,16 +190,15 @@ public class PullRequestMergedEvent { */ private Set<String> extractAuthors() { Set<String> authors; - if (_commitsArray != null && !_commitsArray.isJsonNull() & _commitsArray.size() > 0) { + if (_commitsArray != null && _commitsArray.size() > 0) { authors = new HashSet<>(); - for (JsonElement commit : _commitsArray) { - JsonObject commitAsJsonObject = commit.getAsJsonObject(); - JsonElement author = commitAsJsonObject.get("author"); - if (author.isJsonNull()) { - authors.add(commitAsJsonObject.get("commit").getAsJsonObject().get("author").getAsJsonObject().get("name") - .getAsString()); + for (JsonNode commit : _commitsArray) { + JsonNode commitAsJsonNode = commit; + JsonNode author = commitAsJsonNode.get("author"); + if (author.size() == 0) { + authors.add(commitAsJsonNode.get("commit").get("author").get("name").asText()); } else { - authors.add(author.getAsJsonObject().get("login").getAsString()); + authors.add(author.get("login").asText()); } } } else { @@ -215,16 +211,10 @@ public class PullRequestMergedEvent { * Extracts labels for the PR */ private List<String> extractLabels() { - JsonArray labelsArray = _pullRequest.get("labels").getAsJsonArray(); - List<String> labels; - int size = labelsArray.size(); - if (size > 0) { - labels = new ArrayList<>(size); - for (JsonElement label : labelsArray) { - labels.add(label.getAsJsonObject().get("name").getAsString()); - } - } else { - labels = Collections.emptyList(); + Iterator<JsonNode> labelsIterator = _pullRequest.get("labels").elements(); + List<String> labels = new ArrayList<>(); + while (labelsIterator.hasNext()) { + labels.add(labelsIterator.next().get("name").asText()); } return labels; } @@ -233,16 +223,10 @@ public class PullRequestMergedEvent { * Extracts assignees for the PR */ private List<String> extractAssignees() { - JsonArray assigneesJson = _pullRequest.get("assignees").getAsJsonArray(); - int numAssignees = assigneesJson.size(); - List<String> assignees; - if (numAssignees > 0) { - assignees = new ArrayList<>(numAssignees); - for (JsonElement reviewer : assigneesJson) { - assignees.add(reviewer.getAsJsonObject().get("login").getAsString()); - } - } else { - assignees = Collections.emptyList(); + Iterator<JsonNode> assigneesIterator = _pullRequest.get("assignees").elements(); + List<String> assignees = new ArrayList<>(); + while (assigneesIterator.hasNext()) { + assignees.add(assigneesIterator.next().get("login").asText()); } return assignees; } @@ -251,16 +235,10 @@ public class PullRequestMergedEvent { * Extracts list of requested reviewers */ private List<String> extractRequestedReviewers() { - JsonArray requestedReviewersJson = _pullRequest.get("requested_reviewers").getAsJsonArray(); - int numRequestedReviewers = requestedReviewersJson.size(); - List<String> requestedReviewers; - if (numRequestedReviewers > 0) { - requestedReviewers = new ArrayList<>(numRequestedReviewers); - for (JsonElement reviewer : requestedReviewersJson) { - requestedReviewers.add(reviewer.getAsJsonObject().get("login").getAsString()); - } - } else { - requestedReviewers = Collections.emptyList(); + Iterator<JsonNode> requestedReviewersIterator = _pullRequest.get("requested_reviewers").elements(); + List<String> requestedReviewers = new ArrayList<>(); + while (requestedReviewersIterator.hasNext()) { + requestedReviewers.add(requestedReviewersIterator.next().get("login").asText()); } return requestedReviewers; } @@ -269,16 +247,11 @@ public class PullRequestMergedEvent { * Extracts list of review requested teams */ private List<String> extractRequestedTeams() { - JsonArray requestedTeamsJson = _pullRequest.get("requested_teams").getAsJsonArray(); - int numRequestedTeams = requestedTeamsJson.size(); - List<String> requestedTeams; - if (numRequestedTeams > 0) { - requestedTeams = new ArrayList<>(numRequestedTeams); - for (JsonElement team : requestedTeamsJson) { - requestedTeams.add(team.getAsJsonObject().get("name").getAsString()); - } - } else { - requestedTeams = Collections.emptyList(); + + Iterator<JsonNode> requestedTeamsIterator = _pullRequest.get("requested_teams").elements(); + List<String> requestedTeams = new ArrayList<>(); + while (requestedTeamsIterator.hasNext()) { + requestedTeams.add(requestedTeamsIterator.next().get("name").asText()); } return requestedTeams; } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java index d9866b4..c9d8f5f 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java @@ -18,11 +18,8 @@ */ package org.apache.pinot.tools.streams.githubevents; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import java.io.File; import java.io.IOException; import java.net.URL; @@ -36,6 +33,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.pinot.plugin.inputformat.avro.AvroUtils; import org.apache.pinot.spi.stream.StreamDataProducer; import org.apache.pinot.spi.stream.StreamDataProvider; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.tools.Quickstart; import org.apache.pinot.tools.utils.KafkaStarterUtils; import org.slf4j.Logger; @@ -54,12 +52,12 @@ public class PullRequestMergedEventsStream { private static final Logger LOGGER = LoggerFactory.getLogger(PullRequestMergedEventsStream.class); private static final long SLEEP_MILLIS = 10_000; - private ExecutorService _service; + private final ExecutorService _service; private boolean _keepStreaming = true; - private Schema _avroSchema; - private String _topicName; - private GitHubAPICaller _gitHubAPICaller; + private final Schema _avroSchema; + private final String _topicName; + private final GitHubAPICaller _gitHubAPICaller; private StreamDataProducer _producer; @@ -93,6 +91,17 @@ public class PullRequestMergedEventsStream { _producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); } + public static void main(String[] args) + throws Exception { + String personalAccessToken = args[0]; + String schemaFile = args[1]; + String topic = "pullRequestMergedEvent"; + PullRequestMergedEventsStream stream = + new PullRequestMergedEventsStream(schemaFile, topic, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, + personalAccessToken); + stream.execute(); + } + /** * Starts the stream. * Adds shutdown hook. @@ -150,9 +159,8 @@ public class PullRequestMergedEventsStream { switch (githubAPIResponse.statusCode) { case 200: // Read new events etag = githubAPIResponse.etag; - JsonArray jsonArray = new JsonParser().parse(githubAPIResponse.responseString).getAsJsonArray(); - - for (JsonElement eventElement : jsonArray) { + JsonNode jsonArray = JsonUtils.stringToJsonNode(githubAPIResponse.responseString); + for (JsonNode eventElement : jsonArray) { try { GenericRecord genericRecord = convertToPullRequestMergedGenericRecord(eventElement); if (genericRecord != null) { @@ -206,41 +214,41 @@ public class PullRequestMergedEventsStream { * Find commits, review comments, comments corresponding to this pull request event. * Construct a PullRequestMergedEvent with the help of the event, commits, review comments and comments. * Converts PullRequestMergedEvent to GenericRecord + * @param event */ - private GenericRecord convertToPullRequestMergedGenericRecord(JsonElement eventJson) + private GenericRecord convertToPullRequestMergedGenericRecord(JsonNode event) throws IOException { GenericRecord genericRecord = null; - JsonObject event = eventJson.getAsJsonObject(); - String type = event.get("type").getAsString(); + String type = event.get("type").asText(); if ("PullRequestEvent".equals(type)) { - JsonObject payload = event.get("payload").getAsJsonObject(); + JsonNode payload = event.get("payload"); if (payload != null) { - String action = payload.get("action").getAsString(); - JsonObject pullRequest = payload.get("pull_request").getAsJsonObject(); - String merged = pullRequest.get("merged").getAsString(); + String action = payload.get("action").asText(); + JsonNode pullRequest = payload.get("pull_request"); + String merged = pullRequest.get("merged").asText(); if ("closed".equals(action) && "true".equals(merged)) { // valid pull request merge event - JsonArray commits = null; - String commitsURL = pullRequest.get("commits_url").getAsString(); + JsonNode commits = null; + String commitsURL = pullRequest.get("commits_url").asText(); GitHubAPICaller.GitHubAPIResponse commitsResponse = _gitHubAPICaller.callAPI(commitsURL); if (commitsResponse.responseString != null) { - commits = new JsonParser().parse(commitsResponse.responseString).getAsJsonArray(); + commits = JsonUtils.stringToJsonNode(commitsResponse.responseString); } - JsonArray reviewComments = null; - String reviewCommentsURL = pullRequest.get("review_comments_url").getAsString(); + JsonNode reviewComments = null; + String reviewCommentsURL = pullRequest.get("review_comments_url").asText(); GitHubAPICaller.GitHubAPIResponse reviewCommentsResponse = _gitHubAPICaller.callAPI(reviewCommentsURL); if (reviewCommentsResponse.responseString != null) { - reviewComments = new JsonParser().parse(reviewCommentsResponse.responseString).getAsJsonArray(); + reviewComments = JsonUtils.stringToJsonNode(reviewCommentsResponse.responseString); } - JsonArray comments = null; - String commentsURL = pullRequest.get("comments_url").getAsString(); + JsonNode comments = null; + String commentsURL = pullRequest.get("comments_url").asText(); GitHubAPICaller.GitHubAPIResponse commentsResponse = _gitHubAPICaller.callAPI(commentsURL); if (commentsResponse.responseString != null) { - comments = new JsonParser().parse(commentsResponse.responseString).getAsJsonArray(); + comments = JsonUtils.stringToJsonNode(commentsResponse.responseString); } // get PullRequestMergeEvent @@ -296,15 +304,4 @@ public class PullRequestMergedEventsStream { return genericRecord; } - - public static void main(String[] args) - throws Exception { - String personalAccessToken = args[0]; - String schemaFile = args[1]; - String topic = "pullRequestMergedEvent"; - PullRequestMergedEventsStream stream = - new PullRequestMergedEventsStream(schemaFile, topic, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, - personalAccessToken); - stream.execute(); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org