This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 28ea2a5 [GOBBLIN-1228] Do not localize token file on new TaskRunner
launch
28ea2a5 is described below
commit 28ea2a5df3d5fb237dcf309253f50984e375304e
Author: sv2000 <[email protected]>
AuthorDate: Mon Aug 10 14:14:26 2020 -0700
[GOBBLIN-1228] Do not localize token file on new TaskRunner launch
Closes #3074 from sv2000/tokenFileLocalization
---
.../gobblin/yarn/GobblinApplicationMaster.java | 27 +++----
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 3 -
.../org/apache/gobblin/yarn/YarnHelixUtils.java | 28 ++++----
.../apache/gobblin/yarn/GobblinYarnTestUtils.java | 78 +++++++++++++++++++++
.../apache/gobblin/yarn/YarnHelixUtilsTest.java | 54 ++++++++++++++
gobblin-yarn/src/test/resources/..token.crc | Bin 0 -> 12 bytes
gobblin-yarn/src/test/resources/.token | Bin 0 -> 40 bytes
7 files changed, 163 insertions(+), 27 deletions(-)
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 2fc36b2..192f86f 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -19,22 +19,12 @@ package org.apache.gobblin.yarn;
import java.util.Collections;
import java.util.List;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.GobblinClusterManager;
-import org.apache.gobblin.cluster.GobblinClusterUtils;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.JvmUtils;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
-import org.apache.gobblin.util.logs.LogCopier;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
@@ -49,6 +39,7 @@ import
org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
@@ -57,6 +48,18 @@ import com.typesafe.config.ConfigValueFactory;
import lombok.Getter;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
+import org.apache.gobblin.util.logs.LogCopier;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
+
/**
* The Yarn ApplicationMaster class for Gobblin.
@@ -222,7 +225,7 @@ public class GobblinApplicationMaster extends
GobblinClusterManager {
//Because AM is restarted with the original AppSubmissionContext, it may
have outdated delegation tokens.
//So the refreshed tokens should be added into the container's UGI
before any HDFS/Hive/RM access is performed.
- YarnHelixUtils.updateToken();
+ YarnHelixUtils.updateToken(GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
Log4jConfigurationHelper.updateLog4jConfiguration(GobblinApplicationMaster.class,
GobblinYarnConfigurationKeys.GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE,
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 9db09a9..f6d7949 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -201,9 +201,6 @@ public class GobblinYarnTaskRunner extends
GobblinTaskRunner {
if (!Strings.isNullOrEmpty(helixInstanceTags)) {
config =
config.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY,
ConfigValueFactory.fromAnyRef(helixInstanceTags));
}
- //Because AM is restarted with the original AppSubmissionContext, it may
have outdated delegation tokens.
- //So the refreshed tokens should be added into the container's UGI
before any HDFS/Hive/RM access is performed.
- YarnHelixUtils.updateToken();
GobblinTaskRunner gobblinTaskRunner =
new GobblinYarnTaskRunner(applicationName, applicationId,
helixInstanceName, containerId, config,
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 27ab4e6..946649a 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -19,11 +19,11 @@ package org.apache.gobblin.yarn;
import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.util.Collection;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
-import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -40,12 +40,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.gobblin.util.ConfigUtils;
/**
@@ -72,19 +73,22 @@ public class YarnHelixUtils {
}
/**
- * Update {@link Token} with token file in resources.
+ * Update {@link Token} with token file localized by NM.
*
- * @param
+ * @param tokenFileName name of the token file
* @throws IOException
*/
- public static void updateToken() throws IOException{
- File tokenFile = new
File(YarnHelixUtils.class.getClassLoader().getResource(GobblinYarnConfigurationKeys.TOKEN_FILE_NAME).getFile());
- if(tokenFile.exists()) {
- Credentials credentials = Credentials.readTokenStorageFile(tokenFile,
new Configuration());
- for (Token<? extends TokenIdentifier> token :
credentials.getAllTokens()) {
- LOGGER.info("updating " + token.getKind() + " " + token.getService());
+ public static void updateToken(String tokenFileName) throws IOException{
+ URL tokenFileUrl =
YarnHelixUtils.class.getClassLoader().getResource(tokenFileName);
+ if (tokenFileUrl != null) {
+ File tokenFile = new File(tokenFileUrl.getFile());
+ if (tokenFile.exists()) {
+ Credentials credentials = Credentials.readTokenStorageFile(tokenFile,
new Configuration());
+ for (Token<? extends TokenIdentifier> token :
credentials.getAllTokens()) {
+ LOGGER.info("updating " + token.getKind() + " " +
token.getService());
+ }
+ UserGroupInformation.getCurrentUser().addCredentials(credentials);
}
- UserGroupInformation.getCurrentUser().addCredentials(credentials);
}
}
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
new file mode 100644
index 0000000..9bb5b74
--- /dev/null
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Utility class
+ */
+public class GobblinYarnTestUtils {
+ /**
+ * A utility method for generating a {@link
org.apache.hadoop.fs.FileSystemTestHelper.MockFileSystem} instance
+ * that can return a delegation token on {@link
org.apache.hadoop.fs.FileSystem#getDelegationToken(String)}.
+ *
+ * @param service
+ * @return
+ * @throws IOException
+ */
+ public static FileSystemTestHelper.MockFileSystem
createFileSystemForServiceName(final String service)
+ throws IOException {
+ FileSystemTestHelper.MockFileSystem mockFs = new
FileSystemTestHelper.MockFileSystem();
+ Mockito.when(mockFs.getCanonicalServiceName()).thenReturn(service);
+
Mockito.when(mockFs.getDelegationToken(Mockito.any(String.class))).thenAnswer(new
Answer<Token<?>>() {
+ int unique = 0;
+ @Override
+ public Token<?> answer(InvocationOnMock invocation) throws Throwable {
+ Token<?> token = new Token<TokenIdentifier>();
+ token.setService(new Text(service));
+ // use unique value so when we restore from token storage, we can
+ // tell if it's really the same token
+ token.setKind(new Text("token" + unique++));
+ return token;
+ }
+ });
+ return mockFs;
+ }
+
+ /**
+ * Writes a token file to a given path.
+ * @param path
+ * @param serviceName
+ * @throws IOException
+ */
+ public static void createTokenFileForService(Path path, String serviceName)
+ throws IOException {
+ FileSystem fileSystem = createFileSystemForServiceName(serviceName);
+ Token<?> token = fileSystem.getDelegationToken(serviceName);
+ Credentials credentials = new Credentials();
+ credentials.addToken(token.getService(), token);
+ credentials.writeTokenStorageFile(path, new Configuration());
+ }
+}
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
new file mode 100644
index 0000000..033a24a
--- /dev/null
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+
+public class YarnHelixUtilsTest {
+ /**
+ * Uses the token file created using {@link
GobblinYarnTestUtils#createTokenFileForService(Path, String)} method and
+ * added to the resources folder.
+ * @throws IOException
+ */
+ @Test
+ public void testUpdateToken()
+ throws IOException {
+ //Ensure the credentials is empty on start
+ Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
+ Assert.assertNull(credentials.getToken(new Text("testService")));
+
+ //Attempt reading a non-existent token file and ensure credentials object
has no tokens
+ YarnHelixUtils.updateToken(".token1");
+ credentials = UserGroupInformation.getCurrentUser().getCredentials();
+ Assert.assertNull(credentials.getToken(new Text("testService")));
+
+ //Read a valid token file and ensure the credentials object has a valid
token
+ YarnHelixUtils.updateToken(GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
+ credentials = UserGroupInformation.getCurrentUser().getCredentials();
+ Token<?> readToken = credentials.getToken(new Text("testService"));
+ Assert.assertNotNull(readToken);
+ }
+}
\ No newline at end of file
diff --git a/gobblin-yarn/src/test/resources/..token.crc
b/gobblin-yarn/src/test/resources/..token.crc
new file mode 100644
index 0000000..b73af16
Binary files /dev/null and b/gobblin-yarn/src/test/resources/..token.crc differ
diff --git a/gobblin-yarn/src/test/resources/.token
b/gobblin-yarn/src/test/resources/.token
new file mode 100644
index 0000000..1b3fb29
Binary files /dev/null and b/gobblin-yarn/src/test/resources/.token differ