This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 36678ab ATLAS-4306: Support for User-specific Spool Directory
36678ab is described below
commit 36678ab1f331eb717578c3b2ed6677544ac3aa2a
Author: Ashutosh Mestry <[email protected]>
AuthorDate: Mon May 31 21:34:44 2021 -0700
ATLAS-4306: Support for User-specific Spool Directory
---
.../atlas/notification/spool/AtlasFileSpool.java | 4 ++-
.../atlas/notification/spool/IndexManagement.java | 6 ++---
.../notification/spool/SpoolConfiguration.java | 22 +++++++++++++---
.../atlas/notification/spool/SpoolUtils.java | 30 +++++++++++++++++++++-
.../apache/atlas/notification/spool/BaseTest.java | 2 +-
5 files changed, 54 insertions(+), 10 deletions(-)
diff --git
a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
index ea31284..0c92c30 100644
---
a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
+++
b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
@@ -41,6 +41,7 @@ public class AtlasFileSpool implements NotificationInterface {
private final Publisher publisher;
private Thread publisherThread;
private Boolean initDone = null;
+ private String currentUser;
public AtlasFileSpool(Configuration configuration, AbstractNotification
notificationHandler) {
this.notificationHandler = notificationHandler;
@@ -56,7 +57,7 @@ public class AtlasFileSpool implements NotificationInterface {
if (!isInitDone()) {
try {
- config.setSource(source);
+ config.setSource(source, this.currentUser);
LOG.info("{}: Initialization: Starting...",
this.config.getSourceName());
@@ -86,6 +87,7 @@ public class AtlasFileSpool implements NotificationInterface {
@Override
public void setCurrentUser(String user) {
this.notificationHandler.setCurrentUser(user);
+ this.currentUser = user;
}
@Override
diff --git
a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
index f018983..adbb8d1 100644
---
a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
+++
b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
@@ -54,14 +54,14 @@ public class IndexManagement {
public void init() throws IOException, AtlasException {
String sourceName = config.getSourceName();
- File spoolDir = SpoolUtils.getCreateDirectory(config.getSpoolDir());
-
+ File spoolDir =
SpoolUtils.getCreateDirectoryWithPermissionCheck(config.getSpoolDir(),
config.getUser());
if (spoolDir == null) {
throw new AtlasException(String.format("%s: %s not found or
inaccessible!", sourceName, spoolDir.getAbsolutePath()));
}
- File archiveDir =
SpoolUtils.getCreateDirectory(config.getArchiveDir());
+ config.setSpoolDir(spoolDir.getAbsolutePath());
+ File archiveDir =
SpoolUtils.getCreateDirectory(config.getArchiveDir());
if (archiveDir == null) {
throw new AtlasException(String.format("%s: %s not found or
inaccessible!", sourceName, archiveDir.getAbsolutePath()));
}
diff --git
a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
index 76f05ef..36ea7be 100644
---
a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
+++
b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
@@ -40,19 +40,23 @@ public class SpoolConfiguration {
public static final String PROP_FILE_SPOOL_PAUSE_BEFORE_SEND_SEC
= PROPERTY_PREFIX_SPOOL + "pause.before.send.sec";
private static final String PROP_HIVE_METASTORE_NAME
= PROPERTY_PREFIX_SPOOL + "hivemetastore.name";
+ private final Configuration config;
+
private final String messageHandlerName;
private final int maxArchivedFilesCount;
private final int messageBatchSize;
private final int retryDestinationMS;
private final int fileRollOverSec;
private final int fileSpoolMaxFilesCount;
- private final String spoolDirPath;
- private final String archiveDir;
+ private String spoolDirPath;
+ private String archiveDir;
private final int pauseBeforeSendSec;
private final String hiveMetaStoreName;
private String sourceName;
+ private String user;
public SpoolConfiguration(Configuration cfg, String messageHandlerName) {
+ this.config = cfg;
this.messageHandlerName = messageHandlerName;
this.maxArchivedFilesCount =
cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT,
PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
this.messageBatchSize = cfg.getInt(PROP_MESSAGE_BATCH_SIZE,
PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT);
@@ -65,8 +69,9 @@ public class SpoolConfiguration {
this.hiveMetaStoreName = cfg.getString(PROP_HIVE_METASTORE_NAME,
PROP_HIVE_METASTORE_NAME_DEFAULT);
}
- public void setSource(String val) {
- this.sourceName = val;
+ public void setSource(String source, String user) {
+ this.sourceName = source;
+ this.user = user;
}
public String getSourceName() {
@@ -97,7 +102,12 @@ public class SpoolConfiguration {
return new File(getSpoolDirPath());
}
+ public void setSpoolDir(String absolutePath) {
+ this.spoolDirPath = absolutePath;
+ }
+
public File getArchiveDir() {
+ this.archiveDir = config.getString(PROP_FILE_SPOOL_ARCHIVE_DIR, new
File(getSpoolDirPath(), PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT).toString());
return new File(this.archiveDir);
}
@@ -136,4 +146,8 @@ public class SpoolConfiguration {
public boolean isHiveMetaStore() {
return this.sourceName.equals(this.hiveMetaStoreName);
}
+
+ public String getUser() {
+ return this.user;
+ }
}
diff --git
a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
index abbe33d..9ee4c80 100644
---
a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
+++
b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
@@ -37,6 +37,7 @@ import java.text.SimpleDateFormat;
public class SpoolUtils {
private static final Logger LOG =
LoggerFactory.getLogger(SpoolUtils.class);
+ private static final String USER_SPECIFIC_PATH_NAME_FORMAT =
"%s-%s";
public static final String DEFAULT_CHAR_SET =
"UTF-8";
private static final String DEFAULT_LINE_SEPARATOR =
System.getProperty("line.separator");
private static final String FILE_EXT_JSON =
".json";
@@ -72,6 +73,33 @@ public class SpoolUtils {
return ret;
}
+ public static File getCreateDirectoryWithPermissionCheck(File file, String
user) {
+ File ret = getCreateDirectory(file);
+
+ LOG.info("SpoolUtils.getCreateDirectory({}): Checking permissions...");
+ if (!file.canWrite() || !file.canRead()) {
+ File fileWithUserSuffix = getFileWithUserSuffix(file, user);
+ LOG.error("SpoolUtils.getCreateDirectory({}, {}): Insufficient
permissions for user: {}! Will create: {}",
+ file.getAbsolutePath(), user, user, fileWithUserSuffix);
+ ret = getCreateDirectory(fileWithUserSuffix);
+ }
+
+ return ret;
+ }
+
+ private static File getFileWithUserSuffix(File file, String user) {
+ if (!file.isDirectory()) {
+ return file;
+ }
+
+ String absolutePath = file.getAbsolutePath();
+ if (absolutePath.endsWith(File.pathSeparator)) {
+ absolutePath = StringUtils.removeEnd(absolutePath,
File.pathSeparator);
+ }
+
+ return new File(String.format(USER_SPECIFIC_PATH_NAME_FORMAT,
absolutePath, user));
+ }
+
public static File getCreateDirectory(File file) {
File ret = file;
@@ -79,7 +107,7 @@ public class SpoolUtils {
boolean result = file.mkdirs();
if (!file.isDirectory() || !result) {
- LOG.error("SpoolUtils.getCreateDirectory({}): inaccessible!",
file.toString());
+ LOG.error("SpoolUtils.getCreateDirectory({}): cannot be
created!", file);
ret = null;
}
diff --git
a/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
index 304c821..83971f6 100644
---
a/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
+++
b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
@@ -50,7 +50,7 @@ public class BaseTest {
public SpoolConfiguration getSpoolConfiguration(String spoolDir, String
handlerName) {
SpoolConfiguration cfg = new
SpoolConfiguration(getConfiguration(spoolDir), handlerName);
- cfg.setSource(SOURCE_TEST);
+ cfg.setSource(SOURCE_TEST, "testuser");
return cfg;
}