cuspymd commented on a change in pull request #4098:
URL: https://github.com/apache/zeppelin/pull/4098#discussion_r616012945



##########
File path: 
zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
##########
@@ -55,6 +64,65 @@ public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, 
RecoveryStorage rec
     }
     envs.put("FLINK_LIB_DIR", flinkHome + "/lib");
     envs.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins");
+
+    // yarn application mode specific logic
+    if (context.getProperties().getProperty("flink.execution.mode")
+            .equalsIgnoreCase("yarn_application")) {
+      updateEnvsForYarnApplicationMode(envs, context);
+    }
+
     return envs;
   }
+
+  private void updateEnvsForYarnApplicationMode(Map<String, String> envs,
+                                                InterpreterLaunchContext 
context) {
+    envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true");
+
+    StringBuilder flinkYarnApplicationConfBuilder = new StringBuilder();
+
+    // Extract yarn.ship-files, add hive-site.xml automatically if hive is 
enabled
+    // and HIVE_CONF_DIR is specified
+    String hiveConfDirProperty = 
context.getProperties().getProperty("HIVE_CONF_DIR");
+    List<String> yarnShipFiles = new ArrayList<>();
+    if (StringUtils.isNotBlank(hiveConfDirProperty) &&
+            Boolean.parseBoolean(context.getProperties()
+                    .getProperty("zeppelin.flink.enableHive", "false"))) {
+      File hiveSiteFile = new File(hiveConfDirProperty, "hive-site.xml");
+      if (hiveSiteFile.isFile() && hiveSiteFile.exists()) {
+        yarnShipFiles.add(hiveSiteFile.getAbsolutePath());
+      } else {
+        LOGGER.warn("Hive site file: {} doesn't exist or is not a directory", 
hiveSiteFile);
+      }
+    }
+    if (context.getProperties().containsKey("yarn.ship-files")) {
+      
yarnShipFiles.add(context.getProperties().getProperty("yarn.ship-files"));
+    }

Review comment:
       Separation with `getYarnShipFiles()` function seems to improve 
readability.

##########
File path: 
zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
##########
@@ -55,6 +64,65 @@ public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, 
RecoveryStorage rec
     }
     envs.put("FLINK_LIB_DIR", flinkHome + "/lib");
     envs.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins");
+
+    // yarn application mode specific logic
+    if (context.getProperties().getProperty("flink.execution.mode")
+            .equalsIgnoreCase("yarn_application")) {
+      updateEnvsForYarnApplicationMode(envs, context);
+    }
+
     return envs;
   }
+
+  private void updateEnvsForYarnApplicationMode(Map<String, String> envs,
+                                                InterpreterLaunchContext 
context) {
+    envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true");
+
+    StringBuilder flinkYarnApplicationConfBuilder = new StringBuilder();
+
+    // Extract yarn.ship-files, add hive-site.xml automatically if hive is 
enabled
+    // and HIVE_CONF_DIR is specified
+    String hiveConfDirProperty = 
context.getProperties().getProperty("HIVE_CONF_DIR");
+    List<String> yarnShipFiles = new ArrayList<>();
+    if (StringUtils.isNotBlank(hiveConfDirProperty) &&
+            Boolean.parseBoolean(context.getProperties()
+                    .getProperty("zeppelin.flink.enableHive", "false"))) {
+      File hiveSiteFile = new File(hiveConfDirProperty, "hive-site.xml");
+      if (hiveSiteFile.isFile() && hiveSiteFile.exists()) {
+        yarnShipFiles.add(hiveSiteFile.getAbsolutePath());
+      } else {
+        LOGGER.warn("Hive site file: {} doesn't exist or is not a directory", 
hiveSiteFile);
+      }
+    }
+    if (context.getProperties().containsKey("yarn.ship-files")) {
+      
yarnShipFiles.add(context.getProperties().getProperty("yarn.ship-files"));
+    }
+    if (!yarnShipFiles.isEmpty()) {
+      flinkYarnApplicationConfBuilder.append(
+              " -D yarn.ship-files=" + 
yarnShipFiles.stream().collect(Collectors.joining(",")));
+    }
+
+    // specify yarn.application.name
+    String yarnAppName = 
context.getProperties().getProperty("flink.yarn.appName");
+    if (StringUtils.isNotBlank(yarnAppName)) {
+      // flink run command can not contains whitespace, so replace it with _
+      flinkYarnApplicationConfBuilder.append(
+              " -D yarn.application.name=" + yarnAppName.replaceAll(" ", "_") 
+ "");
+    }
+
+    // add other yarn and python configuration.
+    for (Map.Entry<Object, Object> entry : context.getProperties().entrySet()) 
{
+      if (!entry.getKey().toString().equalsIgnoreCase("yarn.ship-files") &&
+              
!entry.getKey().toString().equalsIgnoreCase("flink.yarn.appName")) {
+        if 
(CharMatcher.whitespace().matchesAnyOf(entry.getValue().toString())) {
+          LOGGER.warn("flink configuration key {} is skipped because it 
contains white space",
+                  entry.getValue().toString());
+        } else {
+          flinkYarnApplicationConfBuilder.append(
+                  " -D " + entry.getKey().toString() + "=" + 
entry.getValue().toString() + "");
+        }
+      }
+    }

Review comment:
       `entry.getKey().toString()` and `entry.getValue().toString()` are 
repeated several times, so it seems to be good to use by explicitly declaring 
them as variables.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to