This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new edb6b14 [GOBBLIN-1366] Add an option to skip initialization of hadoop
tokens …
edb6b14 is described below
commit edb6b1478edca31d4ec61327451bfb8662c2a5dc
Author: Hung Tran <[email protected]>
AuthorDate: Wed Jan 20 21:00:52 2021 -0800
[GOBBLIN-1366] Add an option to skip initialization of hadoop tokens …
Closes #3208 from htran1/token_option
---
.../apache/gobblin/azkaban/AzkabanJobLauncher.java | 37 +++++-----
.../gobblin/azkaban/AzkabanJobLauncherTest.java | 80 ++++++++++++++++++++++
2 files changed, 101 insertions(+), 16 deletions(-)
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index b98fd81..c43360d 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -112,6 +112,9 @@ public class AzkabanJobLauncher extends AbstractJob
implements ApplicationLaunch
private static final String AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS =
"gobblin.azkaban.SLAInSeconds";
private static final String DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS =
"-1"; // No SLA.
+ public static final String GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS =
"gobblin.azkaban.initializeHadoopTokens";
+ public static final String DEFAULT_GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS
= "true";
+
private final Closer closer = Closer.create();
private final JobLauncher jobLauncher;
private final JobListener jobListener;
@@ -169,22 +172,24 @@ public class AzkabanJobLauncher extends AbstractJob
implements ApplicationLaunch
this.props
.setProperty(ConfigurationKeys.JOB_TRACKING_URL_KEY,
Strings.nullToEmpty(conf.get(AZKABAN_LINK_JOBEXEC_URL)));
- if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
- LOG.info("Job type " + props.getProperty(JOB_TYPE) + " provided Hadoop
token in the environment variable "
- + HADOOP_TOKEN_FILE_LOCATION);
- this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY,
System.getenv(HADOOP_TOKEN_FILE_LOCATION));
- } else {
- // see javadoc for more information
- LOG.info("Job type " + props.getProperty(JOB_TYPE) + " did not provide
Hadoop token in the environment variable "
- + HADOOP_TOKEN_FILE_LOCATION + ". Negotiating Hadoop tokens.");
-
- File tokenFile = File.createTempFile("mr-azkaban", ".token");
- TokenUtils.getHadoopTokens(new State(props), Optional.of(tokenFile), new
Credentials());
-
- System.setProperty(HADOOP_TOKEN_FILE_LOCATION,
tokenFile.getAbsolutePath());
- System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY,
tokenFile.getAbsolutePath());
- this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY,
tokenFile.getAbsolutePath());
- this.props.setProperty("env." + HADOOP_TOKEN_FILE_LOCATION,
tokenFile.getAbsolutePath());
+ if
(Boolean.parseBoolean(this.props.getProperty(GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS,
+ DEFAULT_GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS))) {
+ if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
+ LOG.info("Job type " + props.getProperty(JOB_TYPE) + " provided Hadoop
token in the environment variable " + HADOOP_TOKEN_FILE_LOCATION);
+ this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY,
System.getenv(HADOOP_TOKEN_FILE_LOCATION));
+ } else {
+ // see javadoc for more information
+ LOG.info(
+ "Job type " + props.getProperty(JOB_TYPE) + " did not provide
Hadoop token in the environment variable " + HADOOP_TOKEN_FILE_LOCATION + ".
Negotiating Hadoop tokens.");
+
+ File tokenFile = File.createTempFile("mr-azkaban", ".token");
+ TokenUtils.getHadoopTokens(new State(props), Optional.of(tokenFile),
new Credentials());
+
+ System.setProperty(HADOOP_TOKEN_FILE_LOCATION,
tokenFile.getAbsolutePath());
+ System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY,
tokenFile.getAbsolutePath());
+ this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY,
tokenFile.getAbsolutePath());
+ this.props.setProperty("env." + HADOOP_TOKEN_FILE_LOCATION,
tokenFile.getAbsolutePath());
+ }
}
Properties jobProps = this.props;
diff --git
a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/azkaban/AzkabanJobLauncherTest.java
b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/azkaban/AzkabanJobLauncherTest.java
new file mode 100644
index 0000000..4349f29
--- /dev/null
+++
b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/azkaban/AzkabanJobLauncherTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.azkaban;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.runtime.JobLauncherFactory;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.extract.AbstractSource;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test
+public class AzkabanJobLauncherTest {
+
+ @Test
+ public void testDisableTokenInitialization() throws Exception {
+ Properties props = new Properties();
+
+ props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1");
+ props.setProperty(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY,
JobLauncherFactory.JobLauncherType.LOCAL.name());
+ props.setProperty(ConfigurationKeys.JOB_LOCK_ENABLED_KEY, "false");
+ props.setProperty(ConfigurationKeys.STATE_STORE_ENABLED, "false");
+ props.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY,
DummySource.class.getName());
+
+ // Should get an error since tokens are initialized by default
+ try {
+ new AzkabanJobLauncher("test", props);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("Missing required property
keytab.user"));
+ }
+
+ // No error expected since initialization is skipped
+
props.setProperty(AzkabanJobLauncher.GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS,
"false");
+ new AzkabanJobLauncher("test", props);
+ }
+
+ /**
+ * A dummy implementation of {@link Source}.
+ */
+ public static class DummySource extends AbstractSource<String, Integer> {
+ @Override
+ public List<WorkUnit> getWorkunits(SourceState sourceState) {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public Extractor<String, Integer> getExtractor(WorkUnitState state) throws
IOException {
+ return null;
+ }
+
+ @Override
+ public void shutdown(SourceState state) {
+ }
+ }
+}