Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/292#discussion_r23172849
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---
    @@ -0,0 +1,653 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.yarn;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.PrintStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.flink.client.FlinkYarnSessionCli;
    +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
    +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
    +import org.apache.hadoop.yarn.api.records.ApplicationReport;
    +import org.apache.hadoop.yarn.api.records.YarnApplicationState;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.permission.FsAction;
    +import org.apache.hadoop.fs.permission.FsPermission;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.yarn.api.ApplicationConstants;
    +import 
org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
    +import org.apache.hadoop.yarn.api.records.ApplicationId;
    +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
    +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
    +import org.apache.hadoop.yarn.api.records.LocalResource;
    +import org.apache.hadoop.yarn.api.records.NodeReport;
    +import org.apache.hadoop.yarn.api.records.NodeState;
    +import org.apache.hadoop.yarn.api.records.QueueInfo;
    +import org.apache.hadoop.yarn.api.records.Resource;
    +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
    +import org.apache.hadoop.yarn.client.api.YarnClient;
    +import org.apache.hadoop.yarn.client.api.YarnClientApplication;
    +import org.apache.hadoop.yarn.exceptions.YarnException;
    +import org.apache.hadoop.yarn.util.Records;
    +
    +/**
    + * All classes in this package contain code taken from
    + * 
https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
    + * and
    + * https://github.com/hortonworks/simple-yarn-app
    + * and
    + * 
https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
    + *
    + * The Flink jar is uploaded to HDFS by this client.
    + * The application master and all the TaskManager containers get the jar 
file downloaded
    + * by YARN into their local fs.
    + *
    + */
    +public class FlinkYarnClient extends AbstractFlinkYarnClient {
    +   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnClient.class);
    +
    +   /**
    +    * Constants,
    +    * all starting with ENV_ are used as environment variables to pass 
values from the Client
    +    * to the Application Master.
    +    */
    +   public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
    +   public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
    +   public final static String ENV_APP_ID = "_APP_ID";
    +   public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the 
Flink jar resource location (in HDFS).
    +   public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
    +   public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
    +   public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
    +   public static final String ENV_SLOTS = "_SLOTS";
    +   public static final String ENV_DYNAMIC_PROPERTIES = 
"_DYNAMIC_PROPERTIES";
    +
    +   private static final String DEFAULT_QUEUE_NAME = "default";
    +
    +
    +   /**
    +    * Minimum memory requirements, checked by the Client.
    +    */
    +   private static final int MIN_JM_MEMORY = 128;
    +   private static final int MIN_TM_MEMORY = 128;
    +
    +   private Configuration conf;
    +   private YarnClient yarnClient;
    +   private YarnClientApplication yarnApplication;
    +
    +
    +   /**
    +    * Files (usually in a distributed file system) used for the YARN 
session of Flink.
    +    * Contains configuration files and jar files.
    +    */
    +   private Path sessionFilesDir;
    +
    +   /**
    +    * If the user has specified a different number of slots, we store them 
here
    +    */
    +   private int slots = -1;
    +
    +   private int jobManagerMemoryMb = 512;
    +
    +   private int taskManagerMemoryMb = 512;
    +
    +   private int taskManagerCount = 1;
    +
    +   private String yarnQueue = DEFAULT_QUEUE_NAME;
    +
    +   private String configurationDirectory;
    +
    +   private Path flinkConfigurationPath;
    +
    +   private Path flinkLoggingConfigurationPath; // optional
    +
    +   private Path flinkJarPath;
    +
    +   private String dynamicPropertiesEncoded;
    +
    +   private List<File> shipFiles = new ArrayList<File>();
    +
    +
    +   public FlinkYarnClient() {
    +           // Check if security is enabled
    +           if(UserGroupInformation.isSecurityEnabled()) {
    +                   throw new RuntimeException("Flink YARN client does not 
have security support right now."
    +                                   + "File a bug, we will fix it asap");
    --- End diff --
    
    You think so? ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to