Repository: incubator-gobblin Updated Branches: refs/heads/master ab4fc217d -> 249d5a19c
[GOBBLIN-352] Add GithubJsonDataToParquet example in gobblin-example Closes #2222 from tilakpatidar/parquet_example Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/249d5a19 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/249d5a19 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/249d5a19 Branch: refs/heads/master Commit: 249d5a19c4acc2aa6448c92f7e5a5cac372534ed Parents: ab4fc21 Author: tilakpatidar <[email protected]> Authored: Mon May 14 08:42:27 2018 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Mon May 14 08:42:27 2018 -0700 ---------------------------------------------------------------------- gobblin-example/build.gradle | 1 + .../EmbeddedGithubJsonToParquet.java | 169 +++++++++++++++++++ .../GithubDataEventTypesPartitioner.java | 54 ++++++ .../main/resources/githubjsontoparquet.template | 29 ++++ 4 files changed, 253 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/249d5a19/gobblin-example/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-example/build.gradle b/gobblin-example/build.gradle index d3cec0a..441ea8e 100644 --- a/gobblin-example/build.gradle +++ b/gobblin-example/build.gradle @@ -25,6 +25,7 @@ dependencies { runtime project(":gobblin-modules:gobblin-kafka-08") runtime project(":gobblin-modules:gobblin-eventhub") compile project(":gobblin-modules:gobblin-http") + compile project(":gobblin-modules:gobblin-parquet") compile externalDependency.avro compile externalDependency.commonsLang3 compile externalDependency.guava http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/249d5a19/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/EmbeddedGithubJsonToParquet.java ---------------------------------------------------------------------- diff --git a/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/EmbeddedGithubJsonToParquet.java b/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/EmbeddedGithubJsonToParquet.java new file mode 100644 index 0000000..ddc71bb --- /dev/null +++ b/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/EmbeddedGithubJsonToParquet.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.example.githubjsontoparquet; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.apache.commons.cli.CommandLine; +import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.runtime.cli.CliObjectSupport; +import org.apache.gobblin.runtime.cli.PublicMethodsGobblinCliFactory; +import org.apache.gobblin.runtime.embedded.EmbeddedGobblin; +import org.apache.gobblin.runtime.template.ResourceBasedJobTemplate; +import org.apache.gobblin.writer.WriterOutputFormat; +import org.codehaus.plexus.util.FileUtils; +import org.mortbay.log.Log; + +import groovy.util.logging.Slf4j; + + +/** + * Creates a CLI application for running app githubjsontoparquet. + * Cli application takes two arguments: + * 1st arg: Date time (yyyy-mm-dd-hh) of archive to pull, ex: 2015-01-01-25 + * 2nd arg: Work dir with filesystem URI (file:///home/someuser/somefolder)"; + * Run using: + * bin/gobblin run githubjsontoparquet 2017-12-14-15 file:///Users/someuser/somefolder + * @author tilakpatidar + */ +public class EmbeddedGithubJsonToParquet extends EmbeddedGobblin { + + private static final String GITHUB_ARCHIVE_URL_TEMPLATE = "http://data.githubarchive.org/%s.json.gz"; + private static final String DOWNLOAD_DIR = "archives"; + private static final String ARCHIVE_SUFFIX = ".json.gz"; + private static final String WORK_DIR_KEY = "work.dir"; + + @Slf4j + @Alias(value = "githubjsontoparquet", description = "Extract Github data and write to parquet files") + public static class CliFactory extends PublicMethodsGobblinCliFactory { + + public CliFactory() { + super(EmbeddedGithubJsonToParquet.class); + } + + @Override + public EmbeddedGobblin constructEmbeddedGobblin(CommandLine cli) + throws JobTemplate.TemplateException, IOException { + String[] args = cli.getArgs(); + if (args.length < 1) { + throw new RuntimeException("Expected 2 arguments. " + getUsageString()); + } + try { + if (args.length == 2) { + return new EmbeddedGithubJsonToParquet(args[0], args[1]); + } + } catch (JobTemplate.TemplateException | IOException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public String getUsageString() { + return "<Date time (yyyy-mm-dd-hh) of archive to pull> <Work dir with file system URI>"; + } + } + + @CliObjectSupport(argumentNames = {"archiveDateAndHour", "workDir"}) + public EmbeddedGithubJsonToParquet(String archiveDateAndHour, String workDir) + throws JobTemplate.TemplateException, IOException { + super("githubjsontoparquet"); + URL workDirUrl; + try { + workDirUrl = new URL(workDir); + } catch (MalformedURLException e) { + e.printStackTrace(); + throw new RuntimeException("Work directory URI with no protocol or malformed."); + } + + // Set configuration + String fsProtocol = workDirUrl.getProtocol() + ":///"; + this.setConfiguration(WORK_DIR_KEY, workDir); + this.setConfiguration(ConfigurationKeys.FS_URI_KEY, fsProtocol); + this.setConfiguration(ConfigurationKeys.STATE_STORE_ENABLED, "true"); + this.setConfiguration(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, workDir + "/store"); + this.setConfiguration(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, fsProtocol); + this.setConfiguration(ConfigurationKeys.DATA_PUBLISHER_FILE_SYSTEM_URI, fsProtocol); + this.setConfiguration(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, workDir + "/event_data"); + this.setConfiguration(ConfigurationKeys.DATA_PUBLISHER_METADATA_OUTPUT_DIR, workDir + "/metadata"); + this.setConfiguration(ConfigurationKeys.WRITER_OUTPUT_FORMAT_KEY, WriterOutputFormat.PARQUET.toString()); + + //Set template + try { + setTemplate(ResourceBasedJobTemplate.forResourcePath("githubjsontoparquet.template")); + } catch (URISyntaxException | SpecNotFoundException e) { + e.printStackTrace(); + throw new RuntimeException("Cannot set template"); + } + + // Download the archive + String fileUrl = String.format(GITHUB_ARCHIVE_URL_TEMPLATE, archiveDateAndHour); + Path downloadDirPath = createDownloadDir(workDirUrl.getPath(), fileUrl); + Path downloadFile = getAbsoluteDownloadFilePath(downloadDirPath, archiveDateAndHour); + downloadFile(fileUrl, downloadFile); + } + + private Path getAbsoluteDownloadFilePath(Path downloadDirPath, String archiveDateAndHour) { + String downloadFileName = archiveDateAndHour + ARCHIVE_SUFFIX; + return Paths.get(downloadDirPath.toString(), downloadFileName); + } + + private Path createDownloadDir(String workDir, String fileUrl) { + Path downloadDirPath = Paths.get(workDir, DOWNLOAD_DIR); + File downloadDirFile = downloadDirPath.toFile(); + try { + Log.info(String.format("Creating download dir %s", downloadDirFile.toPath().toString())); + FileUtils.forceMkdir(downloadDirFile); + } catch (IOException e) { + throw new RuntimeException(String + .format("Unable to create download location for archive: %s at %s", fileUrl, downloadDirPath.toString())); + } + Log.info(String.format("Created download dir %s", downloadDirFile.toPath().toString())); + return downloadDirPath; + } + + private void downloadFile(String fileUrl, Path destination) { + if (destination.toFile().exists()) { + Log.info(String.format("Skipping download for %s at %s because destination already exists", fileUrl, + destination.toString())); + return; + } + + try { + URL archiveUrl = new URL(fileUrl); + ReadableByteChannel rbc = Channels.newChannel(archiveUrl.openStream()); + FileOutputStream fos = new FileOutputStream(String.valueOf(destination)); + Log.info(String.format("Downloading %s at %s", fileUrl, destination.toString())); + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + Log.info(String.format("Download complete for %s at %s", fileUrl, destination.toString())); + } catch (IOException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/249d5a19/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/GithubDataEventTypesPartitioner.java ---------------------------------------------------------------------- diff --git a/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/GithubDataEventTypesPartitioner.java b/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/GithubDataEventTypesPartitioner.java new file mode 100644 index 0000000..a747ed2 --- /dev/null +++ b/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/GithubDataEventTypesPartitioner.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.example.githubjsontoparquet; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.converter.parquet.ParquetGroup; +import org.apache.gobblin.writer.partitioner.WriterPartitioner; + + +/** + * Partitioner for github json records based on 'PARTITION_KEY' key. + * @author tilakpatidar + */ +public class GithubDataEventTypesPartitioner implements WriterPartitioner<ParquetGroup> { + + private static final String PARTITION_KEY = "type"; + private static final Schema SCHEMA = + SchemaBuilder.record("Schema").namespace("gobblin.writer.partitioner").fields().name(PARTITION_KEY)) + .type(Schema.create(Schema.Type.STRING)).noDefault().endRecord(); + + public GithubDataEventTypesPartitioner(State state, int numBranches, int branchId) { + } + + @Override + public Schema partitionSchema() { + return SCHEMA; + } + + @Override + public GenericRecord partitionForRecord(ParquetGroup record) { + GenericRecord partition = new GenericData.Record(SCHEMA); + partition.put(PARTITION_KEY, record.getString(PARTITION_KEY, 0).replace("\"", "")); + return partition; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/249d5a19/gobblin-example/src/main/resources/githubjsontoparquet.template ---------------------------------------------------------------------- diff --git a/gobblin-example/src/main/resources/githubjsontoparquet.template b/gobblin-example/src/main/resources/githubjsontoparquet.template new file mode 100644 index 0000000..63f67c7 --- /dev/null +++ b/gobblin-example/src/main/resources/githubjsontoparquet.template @@ -0,0 +1,29 @@ +source.class=org.apache.gobblin.source.extractor.filebased.TextFileBasedSource +source.filebased.data.directory=${work.dir}/archives +source.max.number.of.partitions=10 + +source.schema="[{"columnName":"id","isNullable":true,"dataType":{"type":"long"}},{"columnName":"type","isNullable":true,"dataType":{"type":"enum","name":"eventTypes","symbols":["CreateEvent","WatchEvent","PushEvent","ReleaseEvent","IssuesEvent","PullRequestEvent","ForkEvent","GollumEvent","IssueCommentEvent","DeleteEvent","PullRequestReviewCommentEvent","CommitCommentEvent","MemberEvent","PublicEvent"]}},{"columnName":"actor","dataType":{"type":"record","name":"actorDetails","values":[{"columnName":"id","isNullable":true,"dataType":{"type":"long"}},{"columnName":"login","isNullable":true,"dataType":{"type":"string"}},{"columnName":"gravatar_id","isNullable":true,"dataType":{"type":"string"}},{"columnName":"url","isNullable":true,"dataType":{"type":"string"}},{"columnName":"avatar_url","isNullable":true,"dataType":{"type":"string"}}]}},{"columnName":"repo","dataType":{"type":"record","name":"repoDetails","values":[{"columnName":"id","isNullable":true,"dataType":{"type":"long"}},{"col umnName":"name","isNullable":true,"dataType":{"type":"string"}},{"columnName":"url","isNullable":true,"dataType":{"type":"string"}},{"columnName":"urlid","isNullable":true,"dataType":{"type":"string"}}]}},{"columnName":"payload","dataType":{"type":"record","name":"payloadDetails","values":[{"columnName":"id","isNullable":true,"dataType":{"type":"long"}},{"columnName":"ref","isNullable":true,"dataType":{"type":"string"}},{"columnName":"ref_type","isNullable":true,"dataType":{"type":"string"}},{"columnName":"master_branch","isNullable":true,"dataType":{"type":"string"}},{"columnName":"description","isNullable":true,"dataType":{"type":"string"}},{"columnName":"pusher_type","isNullable":true,"dataType":{"type":"string"}},{"columnName":"before","isNullable":true,"dataType":{"type":"string"}},{"columnName":"action","isNullable":true,"dataType":{"type":"enum","name":"actionType","symbols":["started","published","opened","closed","created","reopened","added"]}}]}},{"columnName":"public","is Nullable":true,"dataType":{"type":"boolean"}},{"columnName":"created_at","isNullable":true,"dataType":{"type":"string"}},{"columnName":"created_at_id","isNullable":true,"dataType":{"type":"string"}}]" + +converter.classes="org.apache.gobblin.converter.json.JsonStringToJsonIntermediateConverter,org.apache.gobblin.converter.parquet.JsonIntermediateToParquetGroupConverter" + +extract.table.name=EventData +extract.namespace=org.apache.gobblin.example +extract.table.type=APPEND_ONLY +gobblin.converter.jsonStringToJsonIntermediate.unpackComplexSchemas=true + + +fs.uri="file:///" +state.store.enabled=true +state.store.fs.uri=${fs.uri} +state.store.dir="${work.dir}/store" + + +writer.destination.type=HDFS +writer.output.format=PARQUET +writer.fs.uri=${fs.uri} +writer.partitioner.class=org.apache.gobblin.example.githubjsontoparquet.GithubDataEventTypesPartitioner +writer.builder.class=org.apache.gobblin.writer.ParquetDataWriterBuilder +data.publisher.fs.uri=${fs.uri} +data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher +data.publisher.final.dir="${work.dir}/event_data" +data.publisher.metadata.output.dir="${work.dir}/metadata" \ No newline at end of file
