Repository: flink Updated Branches: refs/heads/flip-6 4afcc4abd -> 55e94c3c6
[FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e57fba0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e57fba0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e57fba0 Branch: refs/heads/flip-6 Commit: 8e57fba073be139f69e072bdb4888d582fa7211a Parents: e11ea3f Author: shuai.xus <[email protected]> Authored: Thu Nov 3 16:24:47 2016 +0800 Committer: Stephan Ewen <[email protected]> Committed: Mon Dec 5 02:49:43 2016 +0100 ---------------------------------------------------------------------- .../resourcemanager/ResourceManager.java | 2 +- .../apache/flink/yarn/YarnResourceManager.java | 552 +++++++++++++++++++ 2 files changed, 553 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8e57fba0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 76b4a86..3bcbfda 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -637,7 +637,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> * * @param t The exception describing the fatal error */ - void onFatalErrorAsync(final Throwable t) { + protected void onFatalErrorAsync(final Throwable t) { runAsync(new Runnable() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/flink/blob/8e57fba0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java new file mode 100644 index 0000000..6280bdf --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Records; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; + +/** + * The yarn implementation of the resource manager. Used when the system is started + * via the resource framework YARN. + */ +public class YarnResourceManager extends ResourceManager<ResourceID> implements AMRMClientAsync.CallbackHandler { + protected final Logger LOG = LoggerFactory.getLogger(getClass()); + + /** The process environment variables */ + private final Map<String, String> ENV; + + /** The heartbeat interval while the resource master is waiting for containers */ + private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500; + + /** The default heartbeat interval during regular operation */ + private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000; + + /** The maximum time that TaskExecutors may be waiting to register at the ResourceManager before they quit */ + private static final FiniteDuration TASKEXECUTOR_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** Environment variable name of the final container id used by the YarnResourceManager. + * Container ID generation may vary across Hadoop versions. */ + final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID"; + + /** Environment variable name of the hostname used by the Yarn. + * TaskExecutor use this host name to start port. */ + final static String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID"; + + /** Default heartbeat interval between this resource manager and the YARN ResourceManager */ + private final int yarnHeartbeatIntervalMillis; + + private final Configuration flinkConfig; + + private final YarnConfiguration yarnConfig; + + /** Client to communicate with the Resource Manager (YARN's master) */ + private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient; + + /** Client to communicate with the Node manager and launch TaskExecutor processes */ + private NMClient nodeManagerClient; + + /** The number of containers requested, but not yet granted */ + private int numPendingContainerRequests; + + public YarnResourceManager( + Configuration flinkConfig, + Map<String, String> env, + RpcService rpcService, + ResourceManagerConfiguration resourceManagerConfiguration, + HighAvailabilityServices highAvailabilityServices, + SlotManagerFactory slotManagerFactory, + MetricRegistry metricRegistry, + JobLeaderIdService jobLeaderIdService, + FatalErrorHandler fatalErrorHandler) { + super( + rpcService, + resourceManagerConfiguration, + highAvailabilityServices, + slotManagerFactory, + metricRegistry, + jobLeaderIdService, + fatalErrorHandler); + this.flinkConfig = flinkConfig; + this.yarnConfig = new YarnConfiguration(); + this.ENV = env; + final int yarnHeartbeatIntervalMS = flinkConfig.getInteger( + ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000; + + final long yarnExpiryIntervalMS = yarnConfig.getLong( + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS); + + if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) { + log.warn("The heartbeat interval of the Flink Application master ({}) is greater " + + "than YARN's expiry interval ({}). The application is likely to be killed by YARN.", + yarnHeartbeatIntervalMS, yarnExpiryIntervalMS); + } + yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS; + numPendingContainerRequests = 0; + } + + @Override + protected void initialize() throws ResourceManagerException { + resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, this); + resourceManagerClient.init(yarnConfig); + resourceManagerClient.start(); + try { + //TODO: change akka address to tcp host and port, the getAddress() interface should return a standard tcp address + Tuple2<String, Integer> hostPort = parseHostPort(getAddress()); + //TODO: the third paramter should be the webmonitor address + resourceManagerClient.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress()); + } catch (Exception e) { + LOG.info("registerApplicationMaster fail", e); + } + + // create the client to communicate with the node managers + nodeManagerClient = NMClient.createNMClient(); + nodeManagerClient.init(yarnConfig); + nodeManagerClient.start(); + nodeManagerClient.cleanupRunningContainersOnStop(true); + } + + @Override + public void shutDown() throws Exception { + // shut down all components + if (resourceManagerClient != null) { + try { + resourceManagerClient.stop(); + } catch (Throwable t) { + LOG.error("Could not cleanly shut down the Asynchronous Resource Manager Client", t); + } + } + if (nodeManagerClient != null) { + try { + nodeManagerClient.stop(); + } catch (Throwable t) { + LOG.error("Could not cleanly shut down the Node Manager Client", t); + } + } + super.shutDown(); + } + + @Override + protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { + + // first, de-register from YARN + FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus); + LOG.info("Unregistering application from the YARN Resource Manager"); + try { + resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, ""); + } catch (Throwable t) { + LOG.error("Could not unregister the application master.", t); + } + } + + @Override + public void startNewWorker(ResourceProfile resourceProfile) { + // Priority for worker containers - priorities are intra-application + //TODO: set priority according to the resource allocated + Priority priority = Priority.newInstance(0); + int mem = resourceProfile.getMemoryInMB() <= Integer.MAX_VALUE ? (int)resourceProfile.getMemoryInMB() : Integer.MAX_VALUE; + if (mem < 0) { + mem = 1024; + } + int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int)resourceProfile.getCpuCores() + 1; + Resource capability = Resource.newInstance(mem , vcore); + requestYarnContainer(capability, priority); + } + + @Override + protected ResourceID workerStarted(ResourceID resourceID) { + return resourceID; + } + + // AMRMClientAsync CallbackHandler methods + @Override + public float getProgress() { + // Temporarily need not record the total size of asked and allocated containers + return 1; + } + + @Override + public void onContainersCompleted(List<ContainerStatus> list) { + for (ContainerStatus container : list) { + if (container.getExitStatus() < 0) { + notifyWorkerFailed(new ResourceID(container.getContainerId().toString()), container.getDiagnostics()); + // TODO: notice job master slot fail + } + } + } + + @Override + public void onContainersAllocated(List<Container> containers) { + for (Container container : containers) { + numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); + LOG.info("Received new container: {} - Remaining pending container requests: {}", + container.getId(), numPendingContainerRequests); + try { + /** Context information used to start a TaskExecutor Java process */ + ContainerLaunchContext taskExecutorLaunchContext = + createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost()); + nodeManagerClient.startContainer(container, taskExecutorLaunchContext); + } + catch (Throwable t) { + // failed to launch the container, will release the failed one and ask for a new one + LOG.error("Could not start TaskManager in container " + container, t); + resourceManagerClient.releaseAssignedContainer(container.getId()); + requestYarnContainer(container.getResource(), container.getPriority()); + } + } + if (numPendingContainerRequests <= 0) { + resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis); + } + } + + @Override + public void onShutdownRequest() { + // Nothing to do + } + + @Override + public void onNodesUpdated(List<NodeReport> list) { + // We are not interested in node updates + } + + @Override + public void onError(Throwable error) { + onFatalErrorAsync(error); + } + + //Utility methods + /** + * Converts a Flink application status enum to a YARN application status enum. + * @param status The Flink application status. + * @return The corresponding YARN application status. + */ + private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { + if (status == null) { + return FinalApplicationStatus.UNDEFINED; + } + else { + switch (status) { + case SUCCEEDED: + return FinalApplicationStatus.SUCCEEDED; + case FAILED: + return FinalApplicationStatus.FAILED; + case CANCELED: + return FinalApplicationStatus.KILLED; + default: + return FinalApplicationStatus.UNDEFINED; + } + } + } + + // parse the host and port from akka address, + // the akka address is like akka.tcp://[email protected]:49712/user/$a + private static Tuple2<String, Integer> parseHostPort(String address) { + String[] hostPort = address.split("@")[1].split(":"); + String host = hostPort[0]; + String port = hostPort[1].split("/")[0]; + return new Tuple2(host, Integer.valueOf(port)); + } + + private void requestYarnContainer(Resource resource, Priority priority) { + resourceManagerClient.addContainerRequest( + new AMRMClient.ContainerRequest(resource, null, null, priority)); + // make sure we transmit the request fast and receive fast news of granted allocations + resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); + + numPendingContainerRequests++; + LOG.info("Requesting new TaskManager container pending requests: {}", + numPendingContainerRequests); + } + + private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host) + throws Exception { + // init the ContainerLaunchContext + final String currDir = ENV.get(ApplicationConstants.Environment.PWD.key()); + + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), 1); + + LOG.info("TaskExecutor{} will be started with container size {} MB, JVM heap size {} MB, " + + "JVM direct memory limit {} MB", + containerId, + taskManagerParameters.taskManagerTotalMemoryMB(), + taskManagerParameters.taskManagerHeapSizeMB(), + taskManagerParameters.taskManagerDirectMemoryLimitMB()); + final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration( + flinkConfig, "", 0, 1, TASKEXECUTOR_REGISTRATION_TIMEOUT); + LOG.debug("TaskManager configuration: {}", taskManagerConfig); + + ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorContext( + flinkConfig, yarnConfig, ENV, + taskManagerParameters, taskManagerConfig, + currDir, YarnTaskExecutorRunner.class, LOG); + + // set a special environment variable to uniquely identify this container + taskExecutorLaunchContext.getEnvironment() + .put(ENV_FLINK_CONTAINER_ID, containerId); + taskExecutorLaunchContext.getEnvironment() + .put(ENV_FLINK_NODE_ID, host); + return taskExecutorLaunchContext; + } + + + /** + * Creates the launch context, which describes how to bring up a TaskExecutor process in + * an allocated YARN container. + * + * <p>This code is extremely YARN specific and registers all the resources that the TaskExecutor + * needs (such as JAR file, config file, ...) and all environment variables in a YARN + * container launch context. The launch context then ensures that those resources will be + * copied into the containers transient working directory. + * + * @param flinkConfig + * The Flink configuration object. + * @param yarnConfig + * The YARN configuration object. + * @param env + * The environment variables. + * @param tmParams + * The TaskExecutor container memory parameters. + * @param taskManagerConfig + * The configuration for the TaskExecutors. + * @param workingDirectory + * The current application master container's working directory. + * @param taskManagerMainClass + * The class with the main method. + * @param log + * The logger. + * + * @return The launch context for the TaskManager processes. + * + * @throws Exception Thrown if teh launch context could not be created, for example if + * the resources could not be copied. + */ + private static ContainerLaunchContext createTaskExecutorContext( + Configuration flinkConfig, + YarnConfiguration yarnConfig, + Map<String, String> env, + ContaineredTaskManagerParameters tmParams, + Configuration taskManagerConfig, + String workingDirectory, + Class<?> taskManagerMainClass, + Logger log) throws Exception { + + // get and validate all relevant variables + + String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH); + + String appId = env.get(YarnConfigKeys.ENV_APP_ID); + + String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR); + + String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES); + + String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + + final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH); + log.info("TM:remote keytab path obtained {}", remoteKeytabPath); + + final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal); + + final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH); + log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath); + + final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH); + log.info("TM:remote krb5 path obtained {}", remoteKrb5Path); + + String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH); + + // obtain a handle to the file system used by YARN + final org.apache.hadoop.fs.FileSystem yarnFileSystem; + try { + yarnFileSystem = org.apache.hadoop.fs.FileSystem.get(yarnConfig); + } catch (IOException e) { + throw new Exception("Could not access YARN's default file system", e); + } + + //register keytab + LocalResource keytabResource = null; + if(remoteKeytabPath != null) { + log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath); + keytabResource = Records.newRecord(LocalResource.class); + Path keytabPath = new Path(remoteKeytabPath); + Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource); + } + + //To support Yarn Secure Integration Test Scenario + LocalResource yarnConfResource = null; + LocalResource krb5ConfResource = null; + boolean hasKrb5 = false; + if(remoteYarnConfPath != null && remoteKrb5Path != null) { + log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath); + yarnConfResource = Records.newRecord(LocalResource.class); + Path yarnConfPath = new Path(remoteYarnConfPath); + Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource); + + log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path); + krb5ConfResource = Records.newRecord(LocalResource.class); + Path krb5ConfPath = new Path(remoteKrb5Path); + Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource); + + hasKrb5 = true; + } + + // register Flink Jar with remote HDFS + LocalResource flinkJar = Records.newRecord(LocalResource.class); + { + Path remoteJarPath = new Path(remoteFlinkJarPath); + Utils.registerLocalResource(yarnFileSystem, remoteJarPath, flinkJar); + } + + // register conf with local fs + LocalResource flinkConf = Records.newRecord(LocalResource.class); + { + // write the TaskManager configuration to a local file + final File taskManagerConfigFile = + new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml"); + log.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath()); + BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile); + + Utils.setupLocalResource(yarnFileSystem, appId, + new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir)); + + log.info("Prepared local resource for modified yaml: {}", flinkConf); + } + + Map<String, LocalResource> taskManagerLocalResources = new HashMap<>(); + taskManagerLocalResources.put("flink.jar", flinkJar); + taskManagerLocalResources.put("flink-conf.yaml", flinkConf); + + //To support Yarn Secure Integration Test Scenario + if(yarnConfResource != null && krb5ConfResource != null) { + taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource); + taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource); + } + + if(keytabResource != null) { + taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource); + } + + // prepare additional files to be shipped + for (String pathStr : shipListString.split(",")) { + if (!pathStr.isEmpty()) { + LocalResource resource = Records.newRecord(LocalResource.class); + Path path = new Path(pathStr); + Utils.registerLocalResource(yarnFileSystem, path, resource); + taskManagerLocalResources.put(path.getName(), resource); + } + } + + // now that all resources are prepared, we can create the launch context + + log.info("Creating container launch context for TaskManagers"); + + boolean hasLogback = new File(workingDirectory, "logback.xml").exists(); + boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists(); + + String launchCommand = BootstrapTools.getTaskManagerShellCommand( + flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR, + hasLogback, hasLog4j, hasKrb5, taskManagerMainClass); + + log.info("Starting TaskManagers with command: " + launchCommand); + + ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); + ctx.setCommands(Collections.singletonList(launchCommand)); + ctx.setLocalResources(taskManagerLocalResources); + + Map<String, String> containerEnv = new HashMap<>(); + containerEnv.putAll(tmParams.taskManagerEnv()); + + // add YARN classpath, etc to the container environment + containerEnv.put(ENV_FLINK_CLASSPATH, classPathString); + Utils.setupYarnClassPath(yarnConfig, containerEnv); + + containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName()); + + if(remoteKeytabPath != null && remoteKeytabPrincipal != null) { + containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath); + containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal); + } + + ctx.setEnvironment(containerEnv); + + try (DataOutputBuffer dob = new DataOutputBuffer()) { + log.debug("Adding security tokens to Task Executor Container launch Context...."); + UserGroupInformation user = UserGroupInformation.getCurrentUser(); + Credentials credentials = user.getCredentials(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + ctx.setTokens(securityTokens); + } + catch (Throwable t) { + log.error("Getting current user info failed when trying to launch the container", t); + } + + return ctx; + } +}
