Repository: falcon Updated Branches: refs/heads/master a78795c3d -> d0bc18860
FALCON-2057 HiveDR not working with multiple users and same DB Author: bvellanki <[email protected]> Reviewers: "yzheng-hortonworks <[email protected]>, Sowmya Ramesh <[email protected]>, Venkat Ranganathan <[email protected]>" Closes #203 from bvellanki/FALCON-2057 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d0bc1886 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d0bc1886 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d0bc1886 Branch: refs/heads/master Commit: d0bc188601566dbf76555a115c7ea9c68ca13909 Parents: a78795c Author: bvellanki <[email protected]> Authored: Thu Jun 30 17:21:06 2016 -0700 Committer: bvellanki <[email protected]> Committed: Thu Jun 30 17:21:06 2016 -0700 ---------------------------------------------------------------------- .../falcon/hive/util/HiveDRStatusStore.java | 23 +++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/d0bc1886/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java index 900afe8..76eda87 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -71,6 +72,8 @@ public class HiveDRStatusStore extends DRStatusStore { Path basePath = new Path(BASE_DEFAULT_STORE_PATH); FileUtils.validatePath(fileSystem, basePath); + // Current limitation is that only users who belong to DRStatusStore.storeGroup can submit HiveDR jobs. + // BaseDir for status store is created with permissions 770 so that all eligible users can access statusStore. Path storePath = new Path(DEFAULT_STORE_PATH); if (!fileSystem.exists(storePath)) { if (!FileSystem.mkdirs(fileSystem, storePath, DEFAULT_STORE_PERMISSION)) { @@ -163,10 +166,11 @@ public class HiveDRStatusStore extends DRStatusStore { private DBReplicationStatus getDbReplicationStatus(String source, String target, String jobName, String database) throws HiveReplicationException{ DBReplicationStatus dbReplicationStatus = null; + Path statusDbDirPath = getStatusDbDirPath(database); Path statusDirPath = getStatusDirPath(database, jobName); + // check if database name or jobName can contain chars not allowed by hdfs dir/file naming. // if yes, use md5 of the same for dir names. prefer to use actual db names for readability. - try { if (fileSystem.exists(statusDirPath)) { dbReplicationStatus = readStatusFile(statusDirPath); @@ -176,6 +180,15 @@ public class HiveDRStatusStore extends DRStatusStore { ReplicationStatus initDbStatus = new ReplicationStatus(source, target, jobName, database, null, ReplicationStatus.Status.INIT, -1); dbReplicationStatus = new DBReplicationStatus(initDbStatus); + + // Create parent dir first with default status store permissions. FALCON-2057 + if (!fileSystem.exists(statusDbDirPath)) { + if (!FileSystem.mkdirs(fileSystem, statusDbDirPath, DEFAULT_STATUS_DIR_PERMISSION)) { + String error = "mkdir failed for " + statusDbDirPath.toString(); + LOG.error(error); + throw new HiveReplicationException(error); + } + } if (!FileSystem.mkdirs(fileSystem, statusDirPath, DEFAULT_STATUS_DIR_PERMISSION)) { String error = "mkdir failed for " + statusDirPath.toString(); LOG.error(error); @@ -197,7 +210,11 @@ public class HiveDRStatusStore extends DRStatusStore { } public Path getStatusDirPath(String database, String jobName) { - return new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/" + jobName); + return new Path(getStatusDbDirPath(database), jobName); + } + + public Path getStatusDbDirPath(String dbName) { + return new Path(new Path(BASE_DEFAULT_STORE_PATH), dbName.toLowerCase()); } private void writeStatusFile(DBReplicationStatus dbReplicationStatus) throws HiveReplicationException { @@ -271,7 +288,7 @@ public class HiveDRStatusStore extends DRStatusStore { public void checkForReplicationConflict(String newSource, String jobName, String database, String table) throws HiveReplicationException { try { - Path globPath = new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/*/latest.json"); + Path globPath = new Path(getStatusDbDirPath(database), "*" + File.separator + "latest.json"); FileStatus[] files = fileSystem.globStatus(globPath); for(FileStatus file : files) { DBReplicationStatus dbFileStatus = new DBReplicationStatus(IOUtils.toString(
