This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f0904f3 [GOBBLIN-1241] Allow nodes/edges in HOCON format and delay
resolution to allow overriding
f0904f3 is described below
commit f0904f3fae2e12b995251ad349760a1bf4a3a0d3
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Aug 18 14:15:58 2020 -0700
[GOBBLIN-1241] Allow nodes/edges in HOCON format and delay resolution to
allow overriding
Closes #3083 from jack-moseley/data-node-resolve
---
.../gobblin/service/modules/core/GitFlowGraphMonitor.java | 8 ++++----
.../gobblin/service/modules/core/GitMonitoringService.java | 5 +++--
.../gobblin/service/modules/flowgraph/BaseDataNode.java | 13 ++++++++-----
.../modules/flowgraph/datanodes/fs/FileSystemDataNode.java | 2 +-
.../main/java/org/apache/gobblin/util/PullFileLoader.java | 14 +++++++++++---
5 files changed, 27 insertions(+), 15 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index 9c1d105..e4f1ef1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -68,7 +68,7 @@ public class GitFlowGraphMonitor extends GitMonitoringService
{
public static final String GIT_FLOWGRAPH_MONITOR_PREFIX =
"gobblin.service.gitFlowGraphMonitor";
private static final String PROPERTIES_EXTENSIONS = "properties";
- private static final String CONF_EXTENSIONS = StringUtils.EMPTY;
+ private static final String CONF_EXTENSIONS = "conf";
private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR =
"git-flowgraph";
private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR =
"gobblin-flowgraph";
@@ -283,7 +283,7 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
Path filePath = new Path(file);
String fileExtension = Files.getFileExtension(filePath.getName());
if (filePath.depth() != depth || !checkFileLevelRelativeToRoot(filePath,
depth)
- || !(this.javaPropsExtensions.contains(fileExtension))) {
+ || !(this.javaPropsExtensions.contains(fileExtension) ||
this.hoconFileExtensions.contains(fileExtension))) {
log.warn("Changed file does not conform to directory structure and file
name format, skipping: "
+ filePath);
return false;
@@ -364,7 +364,7 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
* @throws IOException
*/
private Config loadNodeFileWithOverrides(Path filePath) throws IOException {
- Config nodeConfig = this.pullFileLoader.loadPullFile(filePath,
emptyConfig, false);
+ Config nodeConfig = this.pullFileLoader.loadPullFile(filePath,
emptyConfig, false, false);
return getNodeConfigWithOverrides(nodeConfig, filePath);
}
@@ -375,7 +375,7 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
* @throws IOException
*/
private Config loadEdgeFileWithOverrides(Path filePath) throws IOException {
- Config edgeConfig = this.pullFileLoader.loadPullFile(filePath,
emptyConfig, false);
+ Config edgeConfig = this.pullFileLoader.loadPullFile(filePath,
emptyConfig, false, false);
return getEdgeConfigWithOverrides(edgeConfig, filePath);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
index d455ad8..05d7b9a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
@@ -101,6 +101,7 @@ public abstract class GitMonitoringService extends
AbstractIdleService {
final String folderName;
final PullFileLoader pullFileLoader;
final Set<String> javaPropsExtensions;
+ final Set<String> hoconFileExtensions;
protected volatile boolean isActive = false;
@@ -169,11 +170,11 @@ public abstract class GitMonitoringService extends
AbstractIdleService {
Path folderPath = new Path(this.repositoryDir, this.folderName);
this.javaPropsExtensions =
Sets.newHashSet(config.getString(JAVA_PROPS_EXTENSIONS).split(","));
- Set<String> hoconFileExtensions =
Sets.newHashSet(config.getString(HOCON_FILE_EXTENSIONS).split(","));
+ this.hoconFileExtensions =
Sets.newHashSet(config.getString(HOCON_FILE_EXTENSIONS).split(","));
try {
this.pullFileLoader = new PullFileLoader(folderPath,
FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new
Configuration()),
- this.javaPropsExtensions, hoconFileExtensions);
+ this.javaPropsExtensions, this.hoconFileExtensions);
} catch (IOException e) {
throw new RuntimeException("Could not create pull file loader", e);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
index a3045fd..a4e0d54 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
@@ -34,24 +34,27 @@ import org.apache.gobblin.util.ConfigUtils;
*/
@Alpha
@Slf4j
-@EqualsAndHashCode (exclude = {"rawConfig", "active"})
+@EqualsAndHashCode (exclude = {"rawConfig", "resolvedConfig", "active"})
public class BaseDataNode implements DataNode {
@Getter
private String id;
@Getter
private Config rawConfig;
@Getter
+ private Config resolvedConfig;
+ @Getter
private boolean active = true;
public BaseDataNode(Config nodeProps) throws DataNodeCreationException {
try {
- String nodeId = ConfigUtils.getString(nodeProps,
FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, "");
+ this.rawConfig = nodeProps;
+ this.resolvedConfig = nodeProps.resolve();
+ String nodeId = ConfigUtils.getString(this.resolvedConfig,
FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, "");
Preconditions.checkArgument(!Strings.isNullOrEmpty(nodeId), "Node Id
cannot be null or empty");
this.id = nodeId;
- if
(nodeProps.hasPath(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY)) {
- this.active =
nodeProps.getBoolean(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY);
+ if
(this.resolvedConfig.hasPath(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY))
{
+ this.active =
this.resolvedConfig.getBoolean(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY);
}
- this.rawConfig = nodeProps;
} catch (Exception e) {
throw new DataNodeCreationException(e);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
index b252930..159735a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
@@ -52,7 +52,7 @@ public abstract class FileSystemDataNode extends BaseDataNode
{
public FileSystemDataNode(Config nodeProps) throws DataNodeCreationException
{
super(nodeProps);
try {
- this.fsUri = ConfigUtils.getString(nodeProps, FS_URI_KEY, "");
+ this.fsUri = ConfigUtils.getString(this.getResolvedConfig(), FS_URI_KEY,
"");
Preconditions.checkArgument(!Strings.isNullOrEmpty(this.fsUri), "fs.uri
cannot be null or empty.");
//Validate the srcFsUri and destFsUri of the DataNode.
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
index 5e0ce90..6458683 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
@@ -134,19 +134,27 @@ public class PullFileLoader {
* @param sysProps A {@link Config} used as fallback.
* @param loadGlobalProperties if true, will also load at most one
*.properties file per directory from the
* {@link #rootDirectory} to the pull file {@link Path}.
+ * @param resolve if true, call {@link Config#resolve()} on the config after
loading it
* @return The loaded {@link Config}.
* @throws IOException
*/
- public Config loadPullFile(Path path, Config sysProps, boolean
loadGlobalProperties) throws IOException {
+ public Config loadPullFile(Path path, Config sysProps, boolean
loadGlobalProperties, boolean resolve) throws IOException {
Config fallback = loadGlobalProperties ? loadAncestorGlobalConfigs(path,
sysProps) : sysProps;
+ Config loadedConfig;
if (this.javaPropsPullFileFilter.accept(path)) {
- return loadJavaPropsWithFallback(path, fallback).resolve();
+ loadedConfig = loadJavaPropsWithFallback(path, fallback);
} else if (this.hoconPullFileFilter.accept(path)) {
- return loadHoconConfigAtPath(path).withFallback(fallback).resolve();
+ loadedConfig = loadHoconConfigAtPath(path).withFallback(fallback);
} else {
throw new IOException(String.format("Cannot load pull file %s due to
unrecognized extension.", path));
}
+
+ return resolve ? loadedConfig.resolve() : loadedConfig;
+ }
+
+ public Config loadPullFile(Path path, Config sysProps, boolean
loadGlobalProperties) throws IOException {
+ return loadPullFile(path, sysProps, loadGlobalProperties, true);
}
/**