deniskuzZ commented on a change in pull request #1724: URL: https://github.com/apache/hive/pull/1724#discussion_r553312554
########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ########## @@ -774,6 +780,100 @@ private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table) } } + class LocalTableLock implements Closeable{ + + private Optional<HiveLockObject> lock; + private HiveLock lockObj; + + public LocalTableLock(Optional<HiveLockObject> lock) throws LockException { + + this.lock = lock; + if(!lock.isPresent()) { + return; + } + LOG.info("LocalTableLock; locking: " + lock); + HiveLockManager lockMgr = context.getHiveTxnManager().getLockManager(); + lockObj = lockMgr.lock(lock.get(), HiveLockMode.SEMI_SHARED, true); + LOG.info("LocalTableLock; locked: " + lock); + } + + @Override + public void close() throws IOException { + if(!lock.isPresent()) { + return; + } + LOG.info("LocalTableLock; unlocking: "+lock); + HiveLockManager lockMgr; + try { + lockMgr = context.getHiveTxnManager().getLockManager(); + lockMgr.unlock(lockObj); + } catch (LockException e1) { + throw new IOException(e1); + } + LOG.info("LocalTableLock; unlocked"); + } + + } + + static enum LockFileMoveMode { + none, dp, all; + + public static LockFileMoveMode fromConf(HiveConf conf) { + if (!conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY)) { + return none; + } + String lockFileMoveMode = conf.getVar(HiveConf.ConfVars.HIVE_LOCK_FILE_MOVE_MODE); + return valueOf(lockFileMoveMode); + } + } + + private LocalTableLock acquireLockForFileMove(LoadTableDesc loadTableWork) throws HiveException { + LockFileMoveMode mode = LockFileMoveMode.fromConf(conf); + + if (mode == LockFileMoveMode.none) { + return new LocalTableLock(Optional.empty()); + } + if (mode == LockFileMoveMode.dp) { + if (loadTableWork.getDPCtx() == null) { + return new LocalTableLock(Optional.empty()); + } + } + + WriteEntity output = context.getLoadTableOutputMap().get(loadTableWork); + List<HiveLockObj> lockObjects = context.getOutputLockObjects().get(output); + if (lockObjects == null) { + return new LocalTableLock(Optional.empty()); + } + TableDesc table = loadTableWork.getTable(); + if(table == null) { + return new LocalTableLock(Optional.empty()); + } + + Hive db = getHive(); + Table baseTable = db.getTable(loadTableWork.getTable().getTableName()); + + HiveLockObject.HiveLockObjectData lockData = + new HiveLockObject.HiveLockObjectData(queryPlan.getQueryId(), + String.valueOf(System.currentTimeMillis()), + "IMPLICIT", + queryPlan.getQueryStr(), + conf); + + HiveLockObject lock = new HiveLockObject(baseTable,lockData); + + for (HiveLockObj hiveLockObj : lockObjects) { Review comment: minor: i would prefer stream api if (locks.stream().filter().anyMatch()){..} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org