cuspymd commented on a change in pull request #4098: URL: https://github.com/apache/zeppelin/pull/4098#discussion_r616012945
########## File path: zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java ########## @@ -55,6 +64,65 @@ public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage rec } envs.put("FLINK_LIB_DIR", flinkHome + "/lib"); envs.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins"); + + // yarn application mode specific logic + if (context.getProperties().getProperty("flink.execution.mode") + .equalsIgnoreCase("yarn_application")) { + updateEnvsForYarnApplicationMode(envs, context); + } + return envs; } + + private void updateEnvsForYarnApplicationMode(Map<String, String> envs, + InterpreterLaunchContext context) { + envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true"); + + StringBuilder flinkYarnApplicationConfBuilder = new StringBuilder(); + + // Extract yarn.ship-files, add hive-site.xml automatically if hive is enabled + // and HIVE_CONF_DIR is specified + String hiveConfDirProperty = context.getProperties().getProperty("HIVE_CONF_DIR"); + List<String> yarnShipFiles = new ArrayList<>(); + if (StringUtils.isNotBlank(hiveConfDirProperty) && + Boolean.parseBoolean(context.getProperties() + .getProperty("zeppelin.flink.enableHive", "false"))) { + File hiveSiteFile = new File(hiveConfDirProperty, "hive-site.xml"); + if (hiveSiteFile.isFile() && hiveSiteFile.exists()) { + yarnShipFiles.add(hiveSiteFile.getAbsolutePath()); + } else { + LOGGER.warn("Hive site file: {} doesn't exist or is not a directory", hiveSiteFile); + } + } + if (context.getProperties().containsKey("yarn.ship-files")) { + yarnShipFiles.add(context.getProperties().getProperty("yarn.ship-files")); + } Review comment: Separation with `getYarnShipFiles()` function seems to improve readability. ########## File path: zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java ########## @@ -55,6 +64,65 @@ public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage rec } envs.put("FLINK_LIB_DIR", flinkHome + "/lib"); envs.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins"); + + // yarn application mode specific logic + if (context.getProperties().getProperty("flink.execution.mode") + .equalsIgnoreCase("yarn_application")) { + updateEnvsForYarnApplicationMode(envs, context); + } + return envs; } + + private void updateEnvsForYarnApplicationMode(Map<String, String> envs, + InterpreterLaunchContext context) { + envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true"); + + StringBuilder flinkYarnApplicationConfBuilder = new StringBuilder(); + + // Extract yarn.ship-files, add hive-site.xml automatically if hive is enabled + // and HIVE_CONF_DIR is specified + String hiveConfDirProperty = context.getProperties().getProperty("HIVE_CONF_DIR"); + List<String> yarnShipFiles = new ArrayList<>(); + if (StringUtils.isNotBlank(hiveConfDirProperty) && + Boolean.parseBoolean(context.getProperties() + .getProperty("zeppelin.flink.enableHive", "false"))) { + File hiveSiteFile = new File(hiveConfDirProperty, "hive-site.xml"); + if (hiveSiteFile.isFile() && hiveSiteFile.exists()) { + yarnShipFiles.add(hiveSiteFile.getAbsolutePath()); + } else { + LOGGER.warn("Hive site file: {} doesn't exist or is not a directory", hiveSiteFile); + } + } + if (context.getProperties().containsKey("yarn.ship-files")) { + yarnShipFiles.add(context.getProperties().getProperty("yarn.ship-files")); + } + if (!yarnShipFiles.isEmpty()) { + flinkYarnApplicationConfBuilder.append( + " -D yarn.ship-files=" + yarnShipFiles.stream().collect(Collectors.joining(","))); + } + + // specify yarn.application.name + String yarnAppName = context.getProperties().getProperty("flink.yarn.appName"); + if (StringUtils.isNotBlank(yarnAppName)) { + // flink run command can not contains whitespace, so replace it with _ + flinkYarnApplicationConfBuilder.append( + " -D yarn.application.name=" + yarnAppName.replaceAll(" ", "_") + ""); + } + + // add other yarn and python configuration. + for (Map.Entry<Object, Object> entry : context.getProperties().entrySet()) { + if (!entry.getKey().toString().equalsIgnoreCase("yarn.ship-files") && + !entry.getKey().toString().equalsIgnoreCase("flink.yarn.appName")) { + if (CharMatcher.whitespace().matchesAnyOf(entry.getValue().toString())) { + LOGGER.warn("flink configuration key {} is skipped because it contains white space", + entry.getValue().toString()); + } else { + flinkYarnApplicationConfBuilder.append( + " -D " + entry.getKey().toString() + "=" + entry.getValue().toString() + ""); + } + } + } Review comment: `entry.getKey().toString()` and `entry.getValue().toString()` are repeated several times, so it seems to be good to use by explicitly declaring them as variables. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org