Hi mailpig, I check the code and reproduce this problem. I create a jira issue for it[https://issues.apache.org/jira/browse/KYLIN-3838]. And If you have already fixed it, a pr is welcome. ------------------ Best Regards, Chao Long
------------------ Original ------------------ From: "mailpig"<[email protected]>; Date: Mon, Feb 18, 2019 05:40 PM To: "dev"<[email protected]>; Subject: Retry mechanism is invalid when build with spark In kylin-2.5.2, retry mechanism is invalid when build with spark. In SparkExecutable, when spark app runing failed, it while return a result with -1. Then, the function dowork will return a ExecuteResult without a Throwable object, it's null. source code is: @SuppressWarnings("checkstyle:methodlength") @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { ExecutableManager mgr = getManager(); Map<String, String> extra = mgr.getOutput(getId()).getExtra(); if (extra.containsKey(ExecutableConstants.SPARK_JOB_ID)) { return onResumed(extra.get(ExecutableConstants.SPARK_JOB_ID), mgr); } else { String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt()); CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName); final KylinConfig config = cube.getConfig(); setAlgorithmLayer(); if (KylinConfig.getSparkHome() == null) { throw new NullPointerException(); } if (config.getKylinJobJarPath() == null) { throw new NullPointerException(); } String jars = this.getParam(JARS); //hadoop conf dir String hadoopConf = null; hadoopConf = System.getProperty("kylin.hadoop.conf.dir"); if (StringUtils.isEmpty(hadoopConf)) { throw new RuntimeException( "kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'"); } logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR"); String jobJar = config.getKylinJobJarPath(); if (StringUtils.isEmpty(jars)) { jars = jobJar; } String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt()); CubeSegment segment = cube.getSegmentById(segmentID); Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment); dumpMetadata(segment, mergingSeg); StringBuilder stringBuilder = new StringBuilder(); if (Shell.osType == Shell.OSType.OS_TYPE_WIN) { stringBuilder.append( "set HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry "); } else { stringBuilder.append( "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry "); } Map<String, String> sparkConfs = config.getSparkConfigOverride(); String sparkConfigName = getSparkConfigName(); if (sparkConfigName != null) { Map<String, String> sparkSpecificConfs = config.getSparkConfigOverrideWithSpecificName(sparkConfigName); sparkConfs.putAll(sparkSpecificConfs); } for (Map.Entry<String, String> entry : sparkConfs.entrySet()) { stringBuilder.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue()) .append(" "); } stringBuilder.append("--jars %s %s %s"); final String cmd = String.format(Locale.ROOT, stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), jars, jobJar, formatArgs()); logger.info("cmd: " + cmd); final ExecutorService executorService = Executors.newSingleThreadExecutor(); final CliCommandExecutor exec = new CliCommandExecutor(); final PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() { @Override public void onLogEvent(String infoKey, Map<String, String> info) { // only care three properties here if (ExecutableConstants.SPARK_JOB_ID.equals(infoKey) || ExecutableConstants.YARN_APP_ID.equals(infoKey) || ExecutableConstants.YARN_APP_URL.equals(infoKey)) { getManager().addJobInfo(getId(), info); } } }); Callable callable = new Callable<Pair<Integer, String>>() { @Override public Pair<Integer, String> call() throws Exception { Pair<Integer, String> result; try { result = exec.execute(cmd, patternedLogger); } catch (Exception e) { logger.error("error run spark job:", e); *result = new Pair<>(-1, e.getMessage());* } return result; } }; try { Future<Pair<Integer, String>> future = executorService.submit(callable); Pair<Integer, String> result = null; while (!isDiscarded() && !isPaused()) { if (future.isDone()) { result = future.get(); break; } else { Thread.sleep(5000); } } if (future.isDone() == false) { // user cancelled executorService.shutdownNow(); // interrupt extra = mgr.getOutput(getId()).getExtra(); if (extra != null && extra.get(ExecutableConstants.SPARK_JOB_ID) != null) { killAppRetry(extra.get(ExecutableConstants.SPARK_JOB_ID)); } if (isDiscarded()) { return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded"); } if (isPaused()) { return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped"); } throw new IllegalStateException(); } if (result == null) { result = future.get(); } if (result != null && result.getFirst() == 0) { // done, update all properties Map<String, String> joblogInfo = patternedLogger.getInfo(); // read counter from hdfs String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT); if (counterOutput != null) { if (HadoopUtil.getWorkingFileSystem().exists(new Path(counterOutput))) { Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput); joblogInfo.putAll(counterMap); } else { logger.warn("Spark counter output path not exists: " + counterOutput); } } readCounters(joblogInfo); getManager().addJobInfo(getId(), joblogInfo); return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog()); } // clear SPARK_JOB_ID on job failure. extra = mgr.getOutput(getId()).getExtra(); extra.put(ExecutableConstants.SPARK_JOB_ID, ""); getManager().addJobInfo(getId(), extra); * return new ExecuteResult(ExecuteResult.State.ERROR, result != null ? result.getSecond() : *""); } catch (Exception e) { logger.error("error run spark job:", e); return ExecuteResult.createError(e); } } } However, kylin's retry mechanism depands on the Throwable object in ExecuteResult. If the throwable is null, it will not retry the job. source code: public boolean needRetry(int retry, Throwable t) { if (retry > KylinConfig.getInstanceFromEnv().getJobRetry() || *t == nul*l || (this instanceof DefaultChainedExecutable)) { return false; } else { return isRetryableException(t.getClass().getName()); } } Please check! -- Sent from: http://apache-kylin.74782.x6.nabble.com/
