Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ab4fc217d -> 249d5a19c


[GOBBLIN-352] Add GithubJsonDataToParquet example in gobblin-example

Closes #2222 from tilakpatidar/parquet_example


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/249d5a19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/249d5a19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/249d5a19

Branch: refs/heads/master
Commit: 249d5a19c4acc2aa6448c92f7e5a5cac372534ed
Parents: ab4fc21
Author: tilakpatidar <[email protected]>
Authored: Mon May 14 08:42:27 2018 -0700
Committer: Abhishek Tiwari <[email protected]>
Committed: Mon May 14 08:42:27 2018 -0700

----------------------------------------------------------------------
 gobblin-example/build.gradle                    |   1 +
 .../EmbeddedGithubJsonToParquet.java            | 169 +++++++++++++++++++
 .../GithubDataEventTypesPartitioner.java        |  54 ++++++
 .../main/resources/githubjsontoparquet.template |  29 ++++
 4 files changed, 253 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/249d5a19/gobblin-example/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-example/build.gradle b/gobblin-example/build.gradle
index d3cec0a..441ea8e 100644
--- a/gobblin-example/build.gradle
+++ b/gobblin-example/build.gradle
@@ -25,6 +25,7 @@ dependencies {
     runtime project(":gobblin-modules:gobblin-kafka-08")
     runtime project(":gobblin-modules:gobblin-eventhub")
     compile project(":gobblin-modules:gobblin-http")
+    compile project(":gobblin-modules:gobblin-parquet")
     compile externalDependency.avro
     compile externalDependency.commonsLang3
     compile externalDependency.guava

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/249d5a19/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/EmbeddedGithubJsonToParquet.java
----------------------------------------------------------------------
diff --git 
a/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/EmbeddedGithubJsonToParquet.java
 
b/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/EmbeddedGithubJsonToParquet.java
new file mode 100644
index 0000000..ddc71bb
--- /dev/null
+++ 
b/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/EmbeddedGithubJsonToParquet.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.example.githubjsontoparquet;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.cli.CliObjectSupport;
+import org.apache.gobblin.runtime.cli.PublicMethodsGobblinCliFactory;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+import org.apache.gobblin.runtime.template.ResourceBasedJobTemplate;
+import org.apache.gobblin.writer.WriterOutputFormat;
+import org.codehaus.plexus.util.FileUtils;
+import org.mortbay.log.Log;
+
+import groovy.util.logging.Slf4j;
+
+
+/**
+ * Creates a CLI application for running app githubjsontoparquet.
+ * Cli application takes two arguments:
+ * 1st arg: Date time (yyyy-mm-dd-hh) of archive to pull, ex: 2015-01-01-25
+ * 2nd arg: Work dir with filesystem URI (file:///home/someuser/somefolder)";
+ * Run using:
+ *  bin/gobblin run githubjsontoparquet 2017-12-14-15 
file:///Users/someuser/somefolder
+ * @author tilakpatidar
+ */
+public class EmbeddedGithubJsonToParquet extends EmbeddedGobblin {
+
+  private static final String GITHUB_ARCHIVE_URL_TEMPLATE = 
"http://data.githubarchive.org/%s.json.gz";;
+  private static final String DOWNLOAD_DIR = "archives";
+  private static final String ARCHIVE_SUFFIX = ".json.gz";
+  private static final String WORK_DIR_KEY = "work.dir";
+
+  @Slf4j
+  @Alias(value = "githubjsontoparquet", description = "Extract Github data and 
write to parquet files")
+  public static class CliFactory extends PublicMethodsGobblinCliFactory {
+
+    public CliFactory() {
+      super(EmbeddedGithubJsonToParquet.class);
+    }
+
+    @Override
+    public EmbeddedGobblin constructEmbeddedGobblin(CommandLine cli)
+        throws JobTemplate.TemplateException, IOException {
+      String[] args = cli.getArgs();
+      if (args.length < 1) {
+        throw new RuntimeException("Expected 2 arguments. " + 
getUsageString());
+      }
+      try {
+        if (args.length == 2) {
+          return new EmbeddedGithubJsonToParquet(args[0], args[1]);
+        }
+      } catch (JobTemplate.TemplateException | IOException e) {
+        e.printStackTrace();
+      }
+      return null;
+    }
+
+    @Override
+    public String getUsageString() {
+      return "<Date time (yyyy-mm-dd-hh) of archive to pull> <Work dir with 
file system URI>";
+    }
+  }
+
+  @CliObjectSupport(argumentNames = {"archiveDateAndHour", "workDir"})
+  public EmbeddedGithubJsonToParquet(String archiveDateAndHour, String workDir)
+      throws JobTemplate.TemplateException, IOException {
+    super("githubjsontoparquet");
+    URL workDirUrl;
+    try {
+      workDirUrl = new URL(workDir);
+    } catch (MalformedURLException e) {
+      e.printStackTrace();
+      throw new RuntimeException("Work directory URI with no protocol or 
malformed.");
+    }
+
+    // Set configuration
+    String fsProtocol = workDirUrl.getProtocol() + ":///";
+    this.setConfiguration(WORK_DIR_KEY, workDir);
+    this.setConfiguration(ConfigurationKeys.FS_URI_KEY, fsProtocol);
+    this.setConfiguration(ConfigurationKeys.STATE_STORE_ENABLED, "true");
+    this.setConfiguration(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, workDir 
+ "/store");
+    this.setConfiguration(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
fsProtocol);
+    this.setConfiguration(ConfigurationKeys.DATA_PUBLISHER_FILE_SYSTEM_URI, 
fsProtocol);
+    this.setConfiguration(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, workDir 
+ "/event_data");
+    
this.setConfiguration(ConfigurationKeys.DATA_PUBLISHER_METADATA_OUTPUT_DIR, 
workDir + "/metadata");
+    this.setConfiguration(ConfigurationKeys.WRITER_OUTPUT_FORMAT_KEY, 
WriterOutputFormat.PARQUET.toString());
+
+    //Set template
+    try {
+      
setTemplate(ResourceBasedJobTemplate.forResourcePath("githubjsontoparquet.template"));
+    } catch (URISyntaxException | SpecNotFoundException e) {
+      e.printStackTrace();
+      throw new RuntimeException("Cannot set template");
+    }
+
+    // Download the archive
+    String fileUrl = String.format(GITHUB_ARCHIVE_URL_TEMPLATE, 
archiveDateAndHour);
+    Path downloadDirPath = createDownloadDir(workDirUrl.getPath(), fileUrl);
+    Path downloadFile = getAbsoluteDownloadFilePath(downloadDirPath, 
archiveDateAndHour);
+    downloadFile(fileUrl, downloadFile);
+  }
+
+  private Path getAbsoluteDownloadFilePath(Path downloadDirPath, String 
archiveDateAndHour) {
+    String downloadFileName = archiveDateAndHour + ARCHIVE_SUFFIX;
+    return Paths.get(downloadDirPath.toString(), downloadFileName);
+  }
+
+  private Path createDownloadDir(String workDir, String fileUrl) {
+    Path downloadDirPath = Paths.get(workDir, DOWNLOAD_DIR);
+    File downloadDirFile = downloadDirPath.toFile();
+    try {
+      Log.info(String.format("Creating download dir %s", 
downloadDirFile.toPath().toString()));
+      FileUtils.forceMkdir(downloadDirFile);
+    } catch (IOException e) {
+      throw new RuntimeException(String
+          .format("Unable to create download location for archive: %s at %s", 
fileUrl, downloadDirPath.toString()));
+    }
+    Log.info(String.format("Created download dir %s", 
downloadDirFile.toPath().toString()));
+    return downloadDirPath;
+  }
+
+  private void downloadFile(String fileUrl, Path destination) {
+    if (destination.toFile().exists()) {
+      Log.info(String.format("Skipping download for %s at %s because 
destination already exists", fileUrl,
+          destination.toString()));
+      return;
+    }
+
+    try {
+      URL archiveUrl = new URL(fileUrl);
+      ReadableByteChannel rbc = Channels.newChannel(archiveUrl.openStream());
+      FileOutputStream fos = new FileOutputStream(String.valueOf(destination));
+      Log.info(String.format("Downloading %s at %s", fileUrl, 
destination.toString()));
+      fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
+      Log.info(String.format("Download complete for %s at %s", fileUrl, 
destination.toString()));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/249d5a19/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/GithubDataEventTypesPartitioner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/GithubDataEventTypesPartitioner.java
 
b/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/GithubDataEventTypesPartitioner.java
new file mode 100644
index 0000000..a747ed2
--- /dev/null
+++ 
b/gobblin-example/src/main/java/org/apache/gobblin/example/githubjsontoparquet/GithubDataEventTypesPartitioner.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.example.githubjsontoparquet;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.parquet.ParquetGroup;
+import org.apache.gobblin.writer.partitioner.WriterPartitioner;
+
+
+/**
+ * Partitioner for github json records based on 'PARTITION_KEY' key.
+ * @author tilakpatidar
+ */
+public class GithubDataEventTypesPartitioner implements 
WriterPartitioner<ParquetGroup> {
+
+  private static final String PARTITION_KEY = "type";
+  private static final Schema SCHEMA =
+      
SchemaBuilder.record("Schema").namespace("gobblin.writer.partitioner").fields().name(PARTITION_KEY))
+          .type(Schema.create(Schema.Type.STRING)).noDefault().endRecord();
+
+  public GithubDataEventTypesPartitioner(State state, int numBranches, int 
branchId) {
+  }
+
+  @Override
+  public Schema partitionSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public GenericRecord partitionForRecord(ParquetGroup record) {
+    GenericRecord partition = new GenericData.Record(SCHEMA);
+    partition.put(PARTITION_KEY, record.getString(PARTITION_KEY, 
0).replace("\"", ""));
+    return partition;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/249d5a19/gobblin-example/src/main/resources/githubjsontoparquet.template
----------------------------------------------------------------------
diff --git a/gobblin-example/src/main/resources/githubjsontoparquet.template 
b/gobblin-example/src/main/resources/githubjsontoparquet.template
new file mode 100644
index 0000000..63f67c7
--- /dev/null
+++ b/gobblin-example/src/main/resources/githubjsontoparquet.template
@@ -0,0 +1,29 @@
+source.class=org.apache.gobblin.source.extractor.filebased.TextFileBasedSource
+source.filebased.data.directory=${work.dir}/archives
+source.max.number.of.partitions=10
+
+source.schema="[{"columnName":"id","isNullable":true,"dataType":{"type":"long"}},{"columnName":"type","isNullable":true,"dataType":{"type":"enum","name":"eventTypes","symbols":["CreateEvent","WatchEvent","PushEvent","ReleaseEvent","IssuesEvent","PullRequestEvent","ForkEvent","GollumEvent","IssueCommentEvent","DeleteEvent","PullRequestReviewCommentEvent","CommitCommentEvent","MemberEvent","PublicEvent"]}},{"columnName":"actor","dataType":{"type":"record","name":"actorDetails","values":[{"columnName":"id","isNullable":true,"dataType":{"type":"long"}},{"columnName":"login","isNullable":true,"dataType":{"type":"string"}},{"columnName":"gravatar_id","isNullable":true,"dataType":{"type":"string"}},{"columnName":"url","isNullable":true,"dataType":{"type":"string"}},{"columnName":"avatar_url","isNullable":true,"dataType":{"type":"string"}}]}},{"columnName":"repo","dataType":{"type":"record","name":"repoDetails","values":[{"columnName":"id","isNullable":true,"dataType":{"type":"long"}},{"col
 
umnName":"name","isNullable":true,"dataType":{"type":"string"}},{"columnName":"url","isNullable":true,"dataType":{"type":"string"}},{"columnName":"urlid","isNullable":true,"dataType":{"type":"string"}}]}},{"columnName":"payload","dataType":{"type":"record","name":"payloadDetails","values":[{"columnName":"id","isNullable":true,"dataType":{"type":"long"}},{"columnName":"ref","isNullable":true,"dataType":{"type":"string"}},{"columnName":"ref_type","isNullable":true,"dataType":{"type":"string"}},{"columnName":"master_branch","isNullable":true,"dataType":{"type":"string"}},{"columnName":"description","isNullable":true,"dataType":{"type":"string"}},{"columnName":"pusher_type","isNullable":true,"dataType":{"type":"string"}},{"columnName":"before","isNullable":true,"dataType":{"type":"string"}},{"columnName":"action","isNullable":true,"dataType":{"type":"enum","name":"actionType","symbols":["started","published","opened","closed","created","reopened","added"]}}]}},{"columnName":"public","is
 
Nullable":true,"dataType":{"type":"boolean"}},{"columnName":"created_at","isNullable":true,"dataType":{"type":"string"}},{"columnName":"created_at_id","isNullable":true,"dataType":{"type":"string"}}]"
+
+converter.classes="org.apache.gobblin.converter.json.JsonStringToJsonIntermediateConverter,org.apache.gobblin.converter.parquet.JsonIntermediateToParquetGroupConverter"
+
+extract.table.name=EventData
+extract.namespace=org.apache.gobblin.example
+extract.table.type=APPEND_ONLY
+gobblin.converter.jsonStringToJsonIntermediate.unpackComplexSchemas=true
+
+
+fs.uri="file:///"
+state.store.enabled=true
+state.store.fs.uri=${fs.uri}
+state.store.dir="${work.dir}/store"
+
+
+writer.destination.type=HDFS
+writer.output.format=PARQUET
+writer.fs.uri=${fs.uri}
+writer.partitioner.class=org.apache.gobblin.example.githubjsontoparquet.GithubDataEventTypesPartitioner
+writer.builder.class=org.apache.gobblin.writer.ParquetDataWriterBuilder
+data.publisher.fs.uri=${fs.uri}
+data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
+data.publisher.final.dir="${work.dir}/event_data"
+data.publisher.metadata.output.dir="${work.dir}/metadata"
\ No newline at end of file

Reply via email to