This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new edb6b14  [GOBBLIN-1366] Add an option to skip initialization of hadoop 
tokens …
edb6b14 is described below

commit edb6b1478edca31d4ec61327451bfb8662c2a5dc
Author: Hung Tran <[email protected]>
AuthorDate: Wed Jan 20 21:00:52 2021 -0800

    [GOBBLIN-1366] Add an option to skip initialization of hadoop tokens …
    
    Closes #3208 from htran1/token_option
---
 .../apache/gobblin/azkaban/AzkabanJobLauncher.java | 37 +++++-----
 .../gobblin/azkaban/AzkabanJobLauncherTest.java    | 80 ++++++++++++++++++++++
 2 files changed, 101 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index b98fd81..c43360d 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -112,6 +112,9 @@ public class AzkabanJobLauncher extends AbstractJob 
implements ApplicationLaunch
   private static final String AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS = 
"gobblin.azkaban.SLAInSeconds";
   private static final String DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS = 
"-1"; // No SLA.
 
+  public static final String GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS = 
"gobblin.azkaban.initializeHadoopTokens";
+  public static final String DEFAULT_GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS 
= "true";
+
   private final Closer closer = Closer.create();
   private final JobLauncher jobLauncher;
   private final JobListener jobListener;
@@ -169,22 +172,24 @@ public class AzkabanJobLauncher extends AbstractJob 
implements ApplicationLaunch
     this.props
         .setProperty(ConfigurationKeys.JOB_TRACKING_URL_KEY, 
Strings.nullToEmpty(conf.get(AZKABAN_LINK_JOBEXEC_URL)));
 
-    if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
-      LOG.info("Job type " + props.getProperty(JOB_TYPE) + " provided Hadoop 
token in the environment variable "
-          + HADOOP_TOKEN_FILE_LOCATION);
-      this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, 
System.getenv(HADOOP_TOKEN_FILE_LOCATION));
-    } else {
-      // see javadoc for more information
-      LOG.info("Job type " + props.getProperty(JOB_TYPE) + " did not provide 
Hadoop token in the environment variable "
-          + HADOOP_TOKEN_FILE_LOCATION + ". Negotiating Hadoop tokens.");
-
-      File tokenFile = File.createTempFile("mr-azkaban", ".token");
-      TokenUtils.getHadoopTokens(new State(props), Optional.of(tokenFile), new 
Credentials());
-
-      System.setProperty(HADOOP_TOKEN_FILE_LOCATION, 
tokenFile.getAbsolutePath());
-      System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, 
tokenFile.getAbsolutePath());
-      this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, 
tokenFile.getAbsolutePath());
-      this.props.setProperty("env." + HADOOP_TOKEN_FILE_LOCATION, 
tokenFile.getAbsolutePath());
+    if 
(Boolean.parseBoolean(this.props.getProperty(GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS,
+        DEFAULT_GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS))) {
+      if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
+        LOG.info("Job type " + props.getProperty(JOB_TYPE) + " provided Hadoop 
token in the environment variable " + HADOOP_TOKEN_FILE_LOCATION);
+        this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, 
System.getenv(HADOOP_TOKEN_FILE_LOCATION));
+      } else {
+        // see javadoc for more information
+        LOG.info(
+            "Job type " + props.getProperty(JOB_TYPE) + " did not provide 
Hadoop token in the environment variable " + HADOOP_TOKEN_FILE_LOCATION + ". 
Negotiating Hadoop tokens.");
+
+        File tokenFile = File.createTempFile("mr-azkaban", ".token");
+        TokenUtils.getHadoopTokens(new State(props), Optional.of(tokenFile), 
new Credentials());
+
+        System.setProperty(HADOOP_TOKEN_FILE_LOCATION, 
tokenFile.getAbsolutePath());
+        System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, 
tokenFile.getAbsolutePath());
+        this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, 
tokenFile.getAbsolutePath());
+        this.props.setProperty("env." + HADOOP_TOKEN_FILE_LOCATION, 
tokenFile.getAbsolutePath());
+      }
     }
 
     Properties jobProps = this.props;
diff --git 
a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/azkaban/AzkabanJobLauncherTest.java
 
b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/azkaban/AzkabanJobLauncherTest.java
new file mode 100644
index 0000000..4349f29
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/azkaban/AzkabanJobLauncherTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.azkaban;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.runtime.JobLauncherFactory;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.extract.AbstractSource;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test
+public class AzkabanJobLauncherTest {
+
+  @Test
+  public void testDisableTokenInitialization() throws Exception {
+    Properties props = new Properties();
+
+    props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1");
+    props.setProperty(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY, 
JobLauncherFactory.JobLauncherType.LOCAL.name());
+    props.setProperty(ConfigurationKeys.JOB_LOCK_ENABLED_KEY, "false");
+    props.setProperty(ConfigurationKeys.STATE_STORE_ENABLED, "false");
+    props.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, 
DummySource.class.getName());
+
+    // Should get an error since tokens are initialized by default
+    try {
+      new AzkabanJobLauncher("test", props);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("Missing required property 
keytab.user"));
+    }
+
+    // No error expected since initialization is skipped
+    
props.setProperty(AzkabanJobLauncher.GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS, 
"false");
+    new AzkabanJobLauncher("test", props);
+  }
+
+  /**
+   * A dummy implementation of {@link Source}.
+   */
+  public static class DummySource extends AbstractSource<String, Integer> {
+    @Override
+    public List<WorkUnit> getWorkunits(SourceState sourceState) {
+      return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public Extractor<String, Integer> getExtractor(WorkUnitState state) throws 
IOException {
+      return null;
+    }
+
+    @Override
+    public void shutdown(SourceState state) {
+    }
+  }
+}

Reply via email to