This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 17ae024611313ed254dc6dc3cc9a06540db013a4 Author: wangxiaojing <[email protected]> AuthorDate: Wed May 6 15:14:10 2020 +0800 KYLIN-4348 Fix distributed concurrency lock bug --- .../lock/zookeeper/ZookeeperDistributedLock.java | 2 +- .../org/apache/kylin/rest/service/JobService.java | 26 ++- .../kylin/source/hive/CreateMrHiveDictStep.java | 195 ++++++++++++++++----- .../apache/kylin/source/hive/HiveInputBase.java | 3 +- .../apache/kylin/source/hive/MRHiveDictUtil.java | 15 +- 5 files changed, 191 insertions(+), 50 deletions(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java index 298be56..36c0ad1 100644 --- a/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java +++ b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java @@ -144,7 +144,7 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { try { curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(lockPath, clientBytes); } catch (KeeperException.NodeExistsException ex) { - logger.debug("{} see {} is already locked", client, lockPath); + logger.debug("{} check {} is already locked", client, lockPath); } catch (Exception ex) { throw new IllegalStateException("Error while " + client + " trying to lock " + lockPath, ex); } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index a6c4eea..d1ec4cf 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -79,6 +79,7 @@ import org.apache.kylin.source.ISource; import org.apache.kylin.source.SourceManager; import org.apache.kylin.source.SourcePartition; import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock; +import org.apache.kylin.source.hive.MRHiveDictUtil; import org.apache.kylin.stream.coordinator.Coordinator; import org.apache.kylin.stream.core.model.SegmentBuildState; import org.slf4j.Logger; @@ -643,15 +644,26 @@ public class JobService extends BasicService implements InitializingBean { if (executable.getStatus().isFinalState()) { try { DistributedLock lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread(); + if (lock.isLocked(MRHiveDictUtil.getLockPath(executable.getCubeName(), job.getId()))) {//release mr/hive global dict lock if exists + lock.purgeLocks(MRHiveDictUtil.getLockPath(executable.getCubeName(), null)); + logger.info("{} unlock global MR/Hive dict lock path({}) success", job.getId(), + MRHiveDictUtil.getLockPath(executable.getCubeName(), null)); + if (lock.isLocked(MRHiveDictUtil.getEphemeralLockPath(executable.getCubeName()))) {//release mr/hive global dict Ephemeral lock if exists + lock.purgeLocks(MRHiveDictUtil.getEphemeralLockPath(executable.getCubeName())); + logger.info("{} unlock global MR/Hive dict ephemeral lock path({}) success", job.getId(), + MRHiveDictUtil.getEphemeralLockPath(executable.getCubeName())); + } + } + if(lock.isLocked(CubeJobLockUtil.getLockPath(executable.getCubeName(), job.getId()))){//release cube job dict lock if exists - lock.purgeLocks(CubeJobLockUtil.getLockPath(executable.getCubeName(), null)); - logger.info("{} unlock cube job dict lock path({}) success", job.getId(), CubeJobLockUtil.getLockPath(executable.getCubeName(), null)); + lock.purgeLocks(CubeJobLockUtil.getLockPath(executable.getCubeName(), null)); + logger.info("{} unlock cube job dict lock path({}) success", job.getId(), CubeJobLockUtil.getLockPath(executable.getCubeName(), null)); - if (lock.isLocked(CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()))) {//release cube job Ephemeral lock if exists - lock.purgeLocks(CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName())); - logger.info("{} unlock cube job ephemeral lock path({}) success", job.getId(), CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName())); + if (lock.isLocked(CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()))) {//release cube job Ephemeral lock if exists + lock.purgeLocks(CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName())); + logger.info("{} unlock cube job ephemeral lock path({}) success", job.getId(), CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName())); + } } - } }catch (Exception e){ logger.error("get some error when release cube {} job {} job id {} " , executable.getCubeName(), job.getName(), job.getId()); } @@ -725,7 +737,7 @@ public class JobService extends BasicService implements InitializingBean { + SecurityContextHolder.getContext().getAuthentication().getName()); if (job.getStatus().isComplete()) { throw new IllegalStateException( - "The job " + job.getId() + " has already been finished and cannot be stopped."); + "The job " + job.getId() + " has already been finished and cannot be stopped."); } getExecutableManager().pauseJob(job.getId()); } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java index f3bc4c7..305cdae 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java @@ -44,6 +44,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * @@ -52,31 +54,16 @@ public class CreateMrHiveDictStep extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(CreateMrHiveDictStep.class); private final PatternedLogger stepLogger = new PatternedLogger(logger); - private DistributedLock lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread(); + private final Lock threadLock = new ReentrantLock(); private static final String GET_SQL = "\" Get Max Dict Value Sql : \""; - protected void createMrHiveDict(KylinConfig config) throws Exception { + protected void createMrHiveDict(KylinConfig config, DistributedLock lock) throws Exception { + logger.info("start to run createMrHiveDict {}", getId()); try { if (getIsLock()) { - String pathName = getLockPathName(); - if (Strings.isNullOrEmpty(pathName)) { - throw new IllegalArgumentException("create Mr-Hive dict lock path name is null"); - } - String lockPath = getLockPath(pathName); - boolean isLocked = true; - long lockStartTime = System.currentTimeMillis(); - while (isLocked) { - isLocked = lock.isLocked(lockPath); - stepLogger.log("zookeeper lock path :" + lockPath + ", result is " + isLocked); - if (!isLocked) { - break; - } - // wait 1 min and try again - Thread.sleep(60000); - } - stepLogger.log("zookeeper get lock costTime : " + ((System.currentTimeMillis() - lockStartTime) / 1000) + " s"); - lock.lock(lockPath); + getLock(lock); } + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(getName()); hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride()); hiveCmdBuilder.addStatement(getInitStatement()); @@ -124,7 +111,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable { final String cmd = hiveCmdBuilder.toString(); - stepLogger.log("MR-Hive dict, cmd: " + cmd); + stepLogger.log("MR/Hive dict, cmd: " + cmd); CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = manager.getCube(getCubeName()); @@ -134,23 +121,16 @@ public class CreateMrHiveDictStep extends AbstractExecutable { } else { Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); if (response.getFirst() != 0) { - throw new RuntimeException("Failed to create mr hive dict, error code " + response.getFirst()); + throw new RuntimeException("Failed to create MR/Hive dict, error code " + response.getFirst()); } getManager().addJobInfo(getId(), stepLogger.getInfo()); } - if (getIsLock()) { - String pathName = getLockPathName(); - if (Strings.isNullOrEmpty(pathName)) { - throw new IllegalArgumentException(" create mr hive dict unlock path name is null"); - } - lock.unlock(getLockPath(pathName)); - stepLogger.log("zookeeper unlock path :" + getLockPathName()); + + if (getIsUnlock()) { + unLock(lock); } + getManager().addJobInfo(getId(), stepLogger.getInfo()); } catch (Exception e) { - if (getIsLock()) { - lock.unlock(getLockPath(getLockPathName())); - stepLogger.log("zookeeper unlock path :" + getLockPathName()); - } logger.error("", e); throw e; } @@ -167,25 +147,42 @@ public class CreateMrHiveDictStep extends AbstractExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { KylinConfig config = getCubeSpecificConfig(); + DistributedLock lock = null; try { + if (getIsLock() || getIsUnlock()) { + lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread(); + } String preHdfsShell = getPreHdfsShell(); if (Objects.nonNull(preHdfsShell) && !"".equalsIgnoreCase(preHdfsShell)) { doRetry(preHdfsShell, config); } - createMrHiveDict(config); + createMrHiveDict(config, lock); String postfixHdfsCmd = getPostfixHdfsShell(); if (Objects.nonNull(postfixHdfsCmd) && !"".equalsIgnoreCase(postfixHdfsCmd)) { doRetry(postfixHdfsCmd, config); } - return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); - + if (isDiscarded()) { + if (getIsLock()) { + unLock(lock); + } + return new ExecuteResult(ExecuteResult.State.DISCARDED, stepLogger.getBufferedLog()); + } else { + return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); + } } catch (Exception e) { logger.error("job:" + getId() + " execute finished with exception", e); - return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + if (isDiscarded()) { + if (getIsLock()) { + unLock(lock); + } + return new ExecuteResult(ExecuteResult.State.DISCARDED, stepLogger.getBufferedLog()); + } else { + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } } } @@ -267,6 +264,14 @@ public class CreateMrHiveDictStep extends AbstractExecutable { return Strings.isNullOrEmpty(isLock) ? false : Boolean.parseBoolean(isLock); } + public void setJobFlowJobId(String jobId) { + setParam("jobFlowJobId", jobId); + } + + public String getJobFlowJobId() { + return getParam("jobFlowJobId"); + } + public void setIsUnLock(Boolean isUnLock) { setParam("isUnLock", String.valueOf(isUnLock)); } @@ -284,6 +289,119 @@ public class CreateMrHiveDictStep extends AbstractExecutable { return getParam("lockPathName"); } + private String getMRDictLockPathName() { + String pathName = getLockPathName(); + if (Strings.isNullOrEmpty(pathName)) { + throw new IllegalArgumentException(" create MR/Hive dict lock path name is null"); + } + + String flowJobId = getJobFlowJobId(); + if (Strings.isNullOrEmpty(flowJobId)) { + throw new IllegalArgumentException(" create MR/Hive dict lock path flowJobId is null"); + } + return MRHiveDictUtil.getLockPath(pathName, flowJobId); + } + + private String getMRDictLockParentPathName() { + String pathName = getLockPathName(); + if (Strings.isNullOrEmpty(pathName)) { + throw new IllegalArgumentException(" create MR/Hive dict lock path name is null"); + } + return MRHiveDictUtil.getLockPath(pathName, null); + } + + private String getEphemeralLockPathName() { + String pathName = getLockPathName(); + if (Strings.isNullOrEmpty(pathName)) { + throw new IllegalArgumentException(" create MR/Hive dict lock path name is null"); + } + + return MRHiveDictUtil.getEphemeralLockPath(pathName); + } + + private void getLock(DistributedLock lock) throws InterruptedException { + logger.info("{} try to get global MR/Hive ZK lock", getId()); + String ephemeralLockPath = getEphemeralLockPathName(); + String fullLockPath = getMRDictLockPathName(); + boolean isLocked = true; + boolean getLocked = false; + long lockStartTime = System.currentTimeMillis(); + + boolean isLockedByTheJob = lock.isLocked(fullLockPath); + logger.info("{} global MR/Hive ZK lock is isLockedByTheJob:{}", getId(), isLockedByTheJob); + if (!isLockedByTheJob) { + while (isLocked) { + isLocked = lock.isLocked(getMRDictLockParentPathName());//other job global lock + + if (!isLocked) { + isLocked = lock.isLocked(ephemeralLockPath);//get the ephemeral current lock + stepLogger.log("zookeeper lock path :" + ephemeralLockPath + ", result is " + isLocked); + logger.info("zookeeper lock path :{}, is locked by other job result is {}", ephemeralLockPath, + isLocked); + + if (!isLocked) { + //try to get ephemeral lock + try { + logger.debug("{} before start to get lock ephemeralLockPath {}", getId(), ephemeralLockPath); + threadLock.lock(); + logger.debug("{} start to get lock ephemeralLockPath {}", getId(), ephemeralLockPath); + getLocked = lock.lock(ephemeralLockPath); + logger.debug("{} finish get lock ephemeralLockPath {},getLocked {}", getId(), ephemeralLockPath, getLocked); + } finally { + threadLock.unlock(); + logger.debug("{} finish unlock the thread lock ,ephemeralLockPath {} ", getId(), ephemeralLockPath); + } + + if (getLocked) {//get ephemeral lock success + try { + getLocked = lock.globalPermanentLock(fullLockPath);//add the fullLockPath lock in case of the server crash then the other can the same job can get the lock + if (getLocked) { + break; + } else { + if (lock.isLocked(ephemeralLockPath)) { + lock.unlock(ephemeralLockPath); + } + } + } catch (Exception e) { + if (lock.isLocked(ephemeralLockPath)) { + lock.unlock(ephemeralLockPath); + } + } + } + isLocked = true;//get lock fail,will try again + } + } + // wait 1 min and try again + logger.info( + "{},global parent lock path({}) is locked by other job result is {} ,ephemeral lock path :{} is locked by other job result is {},will try after one minute", + getId(), getMRDictLockParentPathName(), isLocked, ephemeralLockPath, isLocked); + Thread.sleep(60000); + } + } else { + lock.lock(ephemeralLockPath); + } + stepLogger.log("zookeeper get lock costTime : " + ((System.currentTimeMillis() - lockStartTime) / 1000) + " s"); + long useSec = ((System.currentTimeMillis() - lockStartTime) / 1000); + logger.info("job {} get zookeeper lock path:{} success,zookeeper get lock costTime : {} s", getId(), + fullLockPath, useSec); + } + + private void unLock(DistributedLock lock) { + String parentLockPath = getMRDictLockParentPathName(); + String ephemeralLockPath = getEphemeralLockPathName(); + if (lock.isLocked(getMRDictLockPathName())) { + lock.purgeLocks(parentLockPath); + stepLogger.log("zookeeper unlock path :" + parentLockPath); + logger.info("{} unlock full lock path :{} success", getId(), parentLockPath); + } + + if (lock.isLocked(ephemeralLockPath)) { + lock.purgeLocks(ephemeralLockPath); + stepLogger.log("zookeeper unlock path :" + ephemeralLockPath); + logger.info("{} unlock full lock path :{} success", getId(), ephemeralLockPath); + } + } + private static String serilizeToMap(Map<String, String> map) { JSONArray result = new JSONArray(); if (map != null && map.size() > 0) { @@ -322,7 +440,4 @@ public class CreateMrHiveDictStep extends AbstractExecutable { return result; } - private String getLockPath(String pathName) { - return MRHiveDictUtil.DictHiveType.MrDictLockPath.getName() + pathName; - } } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java index 624c8f9..49e3f8d 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java @@ -208,6 +208,7 @@ public class HiveInputBase { step.setIsLock(true); step.setIsUnLock(false); step.setLockPathName(cubeName); + step.setJobFlowJobId(jobId); return step; } @@ -305,7 +306,7 @@ public class HiveInputBase { step.setCreateTableStatementMap(dictHqlMap); step.setIsUnLock(true); step.setLockPathName(cubeName); - //toDo Fix distributed concurrency lock bug + step.setJobFlowJobId(jobId); CubingExecutableUtil.setCubeName(cubeName, step.getParams()); step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL); return step; diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java index fd2d103..85cd855 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java @@ -46,7 +46,8 @@ public class MRHiveDictUtil { protected static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';"); public enum DictHiveType { - GroupBy("group_by"), MrDictLockPath("/mr_dict_lock/"); + GroupBy("group_by"), MrDictLockPath("/mr_dict_lock/"), MrEphemeralDictLockPath( + "/mr_dict_ephemeral_lock/"); private String name; DictHiveType(String name) { @@ -181,6 +182,18 @@ public class MRHiveDictUtil { executableManager.addJobInfo(jobId, info); } + public static String getLockPath(String cubeName, String jobId) { + if (jobId == null) { + return DictHiveType.MrDictLockPath.getName() + cubeName; + } else { + return DictHiveType.MrDictLockPath.getName() + cubeName + "/" + jobId; + } + } + + public static String getEphemeralLockPath(String cubeName) { + return DictHiveType.MrEphemeralDictLockPath.getName() + cubeName; + } + public static String getMRHiveFlatTableGroupBytableName(IJoinedFlatTableDesc flatDesc) { return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix(); }
