Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 62c2d2d6f -> f19f76c7c


[GOBBLIN-610] Add support for secure access to Git in GitMonitoringService.

Closes #2476 from sv2000/gitCredentialsProvider


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f19f76c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f19f76c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f19f76c7

Branch: refs/heads/master
Commit: f19f76c7cc6f626880d84fa821c82161d2f3af09
Parents: 62c2d2d
Author: suvasude <[email protected]>
Authored: Mon Oct 15 13:09:10 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Mon Oct 15 13:09:10 2018 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   8 +
 .../service/modules/core/GitConfigMonitor.java  |   2 +-
 .../modules/core/GitFlowGraphMonitor.java       |   2 +-
 .../modules/core/GitMonitoringService.java      | 169 ++++++++++++++++---
 gradle/scripts/dependencyDefinitions.gradle     |   4 +-
 5 files changed, 157 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f19f76c7/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index b624511..480796f 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -913,4 +913,12 @@ public class ConfigurationKeys {
   public static final String GIT_MONITOR_CONFIG_BASE_DIR = 
"configBaseDirectory";
   public static final String GIT_MONITOR_POLLING_INTERVAL = "pollingInterval";
   public static final String GIT_MONITOR_BRANCH_NAME = "branchName";
+  //Configuration keys for authentication using HTTPS
+  public static final String GIT_MONITOR_USERNAME = "username";
+  public static final String GIT_MONITOR_PASSWORD = "password";
+  //Configuration keys for authentication using SSH with Public Key
+  public static final String GIT_MONITOR_SSH_WITH_PUBLIC_KEY_ENABLED = 
"isSshWithPublicKeyEnabled";
+  public static final String GIT_MONITOR_PRIVATE_KEY_PATH = "privateKeyPath";
+  public static final String GIT_MONITOR_SSH_PASSPHRASE = "passphrase";
+  public static final String GIT_MONITOR_JSCH_LOGGER_ENABLED = 
"isJschLoggerEnabled";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f19f76c7/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index 9fd0555..4fe03da 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -46,7 +46,7 @@ import org.apache.gobblin.util.PullFileLoader;
  */
 @Slf4j
 public class GitConfigMonitor extends GitMonitoringService {
-  public static final String GIT_CONFIG_MONITOR_PREFIX = "gitConfigMonitor";
+  public static final String GIT_CONFIG_MONITOR_PREFIX = 
"gobblin.service.gitConfigMonitor";
 
   private static final String SPEC_DESCRIPTION = "Git-based flow config";
   private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f19f76c7/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index c2afa41..ed16c39 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -56,7 +56,7 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  */
 @Slf4j
 public class GitFlowGraphMonitor extends GitMonitoringService {
-  public static final String GIT_FLOWGRAPH_MONITOR_PREFIX = 
"gitFlowGraphMonitor";
+  public static final String GIT_FLOWGRAPH_MONITOR_PREFIX = 
"gobblin.service.gitFlowGraphMonitor";
 
   private static final String PROPERTIES_EXTENSIONS = "properties";
   private static final String CONF_EXTENSIONS = StringUtils.EMPTY;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f19f76c7/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
index 182fade..11694f2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
@@ -21,7 +21,9 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -30,29 +32,45 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.api.CloneCommand;
+import org.eclipse.jgit.api.FetchCommand;
 import org.eclipse.jgit.api.Git;
 import org.eclipse.jgit.api.ResetCommand;
+import org.eclipse.jgit.api.TransportConfigCallback;
 import org.eclipse.jgit.api.errors.GitAPIException;
 import org.eclipse.jgit.diff.DiffEntry;
 import org.eclipse.jgit.errors.RepositoryNotFoundException;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.ObjectReader;
 import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.transport.CredentialsProvider;
+import org.eclipse.jgit.transport.JschConfigSessionFactory;
+import org.eclipse.jgit.transport.OpenSshConfig;
+import org.eclipse.jgit.transport.SshSessionFactory;
+import org.eclipse.jgit.transport.SshTransport;
+import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
 import org.eclipse.jgit.treewalk.CanonicalTreeParser;
+import org.eclipse.jgit.util.FS;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.AbstractIdleService;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
 import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.password.PasswordManager;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.PullFileLoader;
 
@@ -62,23 +80,26 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
   private static final String REMOTE_NAME = "origin";
   private static final int TERMINATION_TIMEOUT = 30;
 
-  public static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
-  public static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";
-  public static final String SHOULD_CHECKPOINT_HASHES = 
"shouldCheckpointHashes";
-
-  private Integer pollingInterval;
-  protected final ScheduledExecutorService scheduledExecutor;
-  protected GitMonitoringService.GitRepository gitRepo;
-  protected String repositoryDir;
-  protected String folderName;
-  protected Path folderPath;
-  protected final PullFileLoader pullFileLoader;
-  protected final Set<String> javaPropsExtensions;
-  protected final Set<String> hoconFileExtensions;
-  private final boolean shouldCheckpointHashes;
+  static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
+  static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";
+  static final String SHOULD_CHECKPOINT_HASHES = "shouldCheckpointHashes";
+
+  private final Integer pollingInterval;
+  private final ScheduledExecutorService scheduledExecutor;
+
+  private String privateKeyPath;
+  private String passphrase;
+  private boolean isJschLoggerEnabled;
+
+  final GitMonitoringService.GitRepository gitRepo;
+  final String repositoryDir;
+  final String folderName;
+  final PullFileLoader pullFileLoader;
+  final Set<String> javaPropsExtensions;
+
   protected volatile boolean isActive = false;
 
-  public GitMonitoringService(Config config) {
+  GitMonitoringService(Config config) {
     
Preconditions.checkArgument(config.hasPath(ConfigurationKeys.GIT_MONITOR_REPO_URI),
         ConfigurationKeys.GIT_MONITOR_REPO_URI + " needs to be specified.");
 
@@ -87,21 +108,53 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
     String branchName = 
config.getString(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME);
     this.pollingInterval = 
config.getInt(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL);
     this.folderName = 
config.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR);
-    this.shouldCheckpointHashes = ConfigUtils.getBoolean(config, 
SHOULD_CHECKPOINT_HASHES, true);
+    boolean shouldCheckpointHashes = ConfigUtils.getBoolean(config, 
SHOULD_CHECKPOINT_HASHES, true);
+
+    PasswordManager passwordManager = 
PasswordManager.getInstance(ConfigUtils.configToState(config));
+    Either<CredentialsProvider, SshSessionFactory> 
providerSessionFactoryEither;
+    boolean isSshWithPublicKeyEnabled = ConfigUtils.getBoolean(config, 
ConfigurationKeys.GIT_MONITOR_SSH_WITH_PUBLIC_KEY_ENABLED, false);
+    if (isSshWithPublicKeyEnabled) {
+      this.privateKeyPath = ConfigUtils.getString(config, 
ConfigurationKeys.GIT_MONITOR_PRIVATE_KEY_PATH, null);
+      if (Strings.isNullOrEmpty(this.privateKeyPath)) {
+        throw new RuntimeException("Path to private key must be provided");
+      }
+      String passPhraseEnc = ConfigUtils.getString(config, 
ConfigurationKeys.GIT_MONITOR_SSH_PASSPHRASE, null);
+      if (passPhraseEnc != null) {
+        this.passphrase = passwordManager.readPassword(passPhraseEnc);
+      }
+      providerSessionFactoryEither = Either.right(getSshSessionFactory());
+      this.isJschLoggerEnabled = ConfigUtils.getBoolean(config, 
ConfigurationKeys.GIT_MONITOR_JSCH_LOGGER_ENABLED, false);
+    } else { //Use CredentialsProvider
+      String username = ConfigUtils.getString(config, 
ConfigurationKeys.GIT_MONITOR_USERNAME, null);
+      String passwordEnc = ConfigUtils.getString(config, 
ConfigurationKeys.GIT_MONITOR_PASSWORD, null);
+      String password = null;
+      if (passwordEnc != null) {
+        password = passwordManager.readPassword(passwordEnc);
+      }
+      CredentialsProvider credentialsProvider;
+      //Instantiate CredentialsProvider if username/password is provided.
+      if (!Strings.isNullOrEmpty(username) && 
!Strings.isNullOrEmpty(password)) {
+        credentialsProvider = new 
UsernamePasswordCredentialsProvider(username, password);
+      } else {
+        credentialsProvider = CredentialsProvider.getDefault();
+      }
+      providerSessionFactoryEither = Either.left(credentialsProvider);
+    }
+
     try {
-      this.gitRepo =
-          new GitMonitoringService.GitRepository(repositoryUri, repositoryDir, 
branchName, shouldCheckpointHashes);
+      this.gitRepo = new GitMonitoringService.GitRepository(repositoryUri, 
repositoryDir, branchName, providerSessionFactoryEither,
+          shouldCheckpointHashes);
     } catch (GitAPIException | IOException e) {
       throw new RuntimeException("Could not open git repository", e);
     }
 
-    this.folderPath = new Path(this.repositoryDir, this.folderName);
+    Path folderPath = new Path(this.repositoryDir, this.folderName);
     this.javaPropsExtensions = 
Sets.newHashSet(config.getString(JAVA_PROPS_EXTENSIONS).split(","));
-    this.hoconFileExtensions = 
Sets.newHashSet(config.getString(HOCON_FILE_EXTENSIONS).split(","));
+    Set<String> hoconFileExtensions = 
Sets.newHashSet(config.getString(HOCON_FILE_EXTENSIONS).split(","));
     try {
-      this.pullFileLoader = new PullFileLoader(this.folderPath,
+      this.pullFileLoader = new PullFileLoader(folderPath,
           FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new 
Configuration()),
-          this.javaPropsExtensions, this.hoconFileExtensions);
+          this.javaPropsExtensions, hoconFileExtensions);
     } catch (IOException e) {
       throw new RuntimeException("Could not create pull file loader", e);
     }
@@ -189,6 +242,7 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
     this.scheduledExecutor.awaitTermination(TERMINATION_TIMEOUT, 
TimeUnit.SECONDS);
   }
 
+
   /**
    * Class for managing a git repository
    */
@@ -199,6 +253,8 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
     private final String repoDir;
     private final String branchName;
     private final boolean shouldCheckpointHashes;
+    private final Either<CredentialsProvider, SshSessionFactory> 
providerSessionFactoryEither;
+
     private Git git;
     private String lastProcessedGitHash;
     private String latestGitHash;
@@ -208,13 +264,17 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
      * @param repoUri URI of repository
      * @param repoDir Directory to hold the local copy of the repository
      * @param branchName Branch name
+     * @param providerSessionFactoryEither Either {@link 
UsernamePasswordCredentialsProvider} or {@link SshSessionFactory}
+     * @param shouldCheckpointHashes a boolean to determine whether to 
checkpoint commit hashes
      * @throws GitAPIException
      * @throws IOException
      */
-    GitRepository(String repoUri, String repoDir, String branchName, boolean 
shouldCheckpointHashes) throws GitAPIException, IOException {
+    GitRepository(String repoUri, String repoDir, String branchName, 
Either<CredentialsProvider, SshSessionFactory>
+        providerSessionFactoryEither, boolean shouldCheckpointHashes) throws 
GitAPIException, IOException {
       this.repoUri = repoUri;
       this.repoDir = repoDir;
       this.branchName = branchName;
+      this.providerSessionFactoryEither = providerSessionFactoryEither;
       this.shouldCheckpointHashes = shouldCheckpointHashes;
 
       initRepository();
@@ -242,6 +302,8 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
             .setDirectory(repoDirFile)
             .setURI(this.repoUri)
             .setBranch(this.branchName)
+            .setTransportConfigCallback(buildTransportConfigCallback())
+            .setCredentialsProvider(getCredentialsProvider())
             .call();
       }
 
@@ -307,7 +369,11 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
       ObjectId oldHeadTree = 
git.getRepository().resolve(this.lastProcessedGitHash + "^{tree}");
 
       // refresh to latest and reset hard to handle forced pushes
-      this.git.fetch().setRemote(REMOTE_NAME).call();
+      this.git.fetch()
+          .setRemote(REMOTE_NAME)
+          .setCredentialsProvider(getCredentialsProvider())
+          .setTransportConfigCallback(buildTransportConfigCallback())
+          .call();
       // reset hard to get a clean working set since pull --rebase may leave 
files around
       this.git.reset().setMode(ResetCommand.ResetType.HARD).setRef(REMOTE_NAME 
+ "/" + this.branchName).call();
 
@@ -330,6 +396,61 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
           .setShowNameAndStatusOnly(true)
           .call();
     }
+
+    private CredentialsProvider getCredentialsProvider() {
+      return (this.providerSessionFactoryEither instanceof Either.Right)? null 
:
+          ((Either.Left<CredentialsProvider, SshSessionFactory>) 
this.providerSessionFactoryEither).getLeft();
+    }
+
+    private TransportConfigCallback buildTransportConfigCallback() {
+      if (this.providerSessionFactoryEither instanceof Either.Left) return 
null;
+
+      SshSessionFactory sshSessionFactory = 
((Either.Right<CredentialsProvider, SshSessionFactory>) 
this.providerSessionFactoryEither).getRight();
+      return transport -> {
+        SshTransport sshTransport = (SshTransport) transport;
+        sshTransport.setSshSessionFactory(sshSessionFactory);
+      };
+    }
+  }
+
+  private SshSessionFactory getSshSessionFactory() {
+    JschConfigSessionFactory sessionFactory = new JschConfigSessionFactory() {
+      @Override
+      protected void configure(OpenSshConfig.Host hc, Session session) {
+        //Do nothing.
+      }
+
+      @Override
+      protected JSch createDefaultJSch(FS fs) throws JSchException {
+        if (GitMonitoringService.this.isJschLoggerEnabled) {
+          JSch.setLogger(new JschLogger());
+        }
+        JSch defaultJSch = super.createDefaultJSch(fs);
+        defaultJSch.addIdentity(GitMonitoringService.this.privateKeyPath, 
GitMonitoringService.this.passphrase);
+        return defaultJSch;
+      }
+    };
+    return sessionFactory;
+  }
+
+  private static class JschLogger implements com.jcraft.jsch.Logger {
+    static Map<Integer, String> logMap = new HashMap<>();
+    static {
+      logMap.put(DEBUG, "DEBUG: ");
+      logMap.put(INFO, "INFO: ");
+      logMap.put(WARN, "WARN: ");
+      logMap.put(ERROR, "ERROR: ");
+      logMap.put(FATAL, "FATAL: ");
+    }
+
+    public boolean isEnabled(int level) {
+      return true;
+    }
+
+    public void log(int level, String message) {
+      System.err.print(logMap.get(level));
+      System.err.println(message);
+    }
   }
 
   public abstract boolean shouldPollGit();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f19f76c7/gradle/scripts/dependencyDefinitions.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index 23c5a19..f7ff29c 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -73,7 +73,7 @@ ext.externalDependency = [
     "httpmime": "org.apache.httpcomponents:httpmime:4.5.2",
     "httpcore": "org.apache.httpcomponents:httpcore:4.4.4",
     "httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3",
-    "jgit":"org.eclipse.jgit:org.eclipse.jgit:4.8.0.201706111038-r",
+    "jgit": "org.eclipse.jgit:org.eclipse.jgit:5.1.1.201809181055-r",
     "kafka08": "org.apache.kafka:kafka_2.11:" + kafka08Version,
     "kafka08Test": "org.apache.kafka:kafka_2.11:" + kafka08Version + ":test",
     "kafka08Client": "org.apache.kafka:kafka-clients:" + kafka08Version,
@@ -96,7 +96,7 @@ ext.externalDependency = [
     "metricsCore": "io.dropwizard.metrics:metrics-core:" + 
dropwizardMetricsVersion,
     "metricsJvm": "io.dropwizard.metrics:metrics-jvm:" + 
dropwizardMetricsVersion,
     "metricsGraphite": "io.dropwizard.metrics:metrics-graphite:" + 
dropwizardMetricsVersion,
-    "jsch": "com.jcraft:jsch:0.1.53",
+    "jsch": "com.jcraft:jsch:0.1.54",
     "jdo2": "javax.jdo:jdo2-api:2.1",
     "azkaban": "com.linkedin.azkaban:azkaban:2.5.0",
     "commonsVfs": "org.apache.commons:commons-vfs2:2.0",

Reply via email to