[
https://issues.apache.org/jira/browse/GOBBLIN-1696?focusedWorklogId=810184&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-810184
]
ASF GitHub Bot logged work on GOBBLIN-1696:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 19/Sep/22 21:19
Start Date: 19/Sep/22 21:19
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3548:
URL: https://github.com/apache/gobblin/pull/3548#discussion_r974655361
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -1022,8 +1022,14 @@ public class ConfigurationKeys {
* Configuration properties related to flowGraphs
*/
- public static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
- public static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";
+ public static final String JAVA_PROPS_EXTENSIONS =
"flowGraph.javaPropsExtensions";
Review Comment:
Maybe include flowGraph in the config name as well?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java:
##########
@@ -197,10 +200,9 @@ private boolean checkFilePath(String file, int depth) {
Path filePath = new Path(file);
Review Comment:
According to the comments on line 193, file is the relative path. But
according to the code, seems we expect it to be absolute path. Can we update to
make sure javadoc reflect the correct expectation?
##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ExceptionCatchingPathAlterationListenerDecorator.java:
##########
@@ -42,73 +45,83 @@ public Object getDecoratedObject() {
@Override
public void onStart(PathAlterationObserver observer) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onStart(observer);
- } catch (Throwable exc) {
- log.error("onStart failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onFileCreate(Path path) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onFileCreate(path);
- } catch (Throwable exc) {
- log.error("onFileCreate failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onFileChange(Path path) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onFileChange(path);
- } catch (Throwable exc) {
- log.error("onFileChange failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onStop(PathAlterationObserver observer) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onStop(observer);
- } catch (Throwable exc) {
- log.error("onStop failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onDirectoryCreate(Path directory) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onDirectoryCreate(directory);
- } catch (Throwable exc) {
- log.error("onDirectoryCreate failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onDirectoryChange(Path directory) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onDirectoryChange(directory);
- } catch (Throwable exc) {
- log.error("onDirectoryChange failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onDirectoryDelete(Path directory) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onDirectoryDelete(directory);
- } catch (Throwable exc) {
- log.error("onDirectoryDelete failure: ", exc);
- }
+ return null;
+ });
}
@Override
public void onFileDelete(Path path) {
- try {
+ logSwallowedThrowable(() -> {
this.underlying.onFileDelete(path);
+ return null;
+ });
+ }
+
+ @Override
+ public void onCheckDetectedChange() {
+ logSwallowedThrowable(() -> {
+ this.underlying.onCheckDetectedChange();
+ return null;
+ });
+ }
+
+ protected void logSwallowedThrowable(Callable<Void> c) {
+ try {
+ c.call();
} catch (Throwable exc) {
- log.error("onFileDelete failure: ", exc);
+ String methodName =
Arrays.stream(exc.getStackTrace()).findFirst().get().getMethodName();
+ log.error(methodName + " failed due to exception:", exc);
Review Comment:
Shouldn't we emit some metrics to indicate this failure? I feel this might
ends up with silent failure and we cannot catch it immediately
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java:
##########
@@ -121,7 +124,7 @@ public MultiHopFlowCompiler(Config config, Optional<Logger>
log, boolean instrum
MultiHopFlowCompiler.log.warn("Exception reading data node alias map,
ignoring it.", e);
}
- this.flowGraph = new BaseFlowGraph(dataNodeAliasMap);
+ this.flowGraph = new AtomicReference<>(new
BaseFlowGraph(dataNodeAliasMap));
Review Comment:
Can we add comments to explain why we want use atomicReference here?
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -1022,8 +1022,14 @@ public class ConfigurationKeys {
* Configuration properties related to flowGraphs
*/
- public static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
- public static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";
+ public static final String JAVA_PROPS_EXTENSIONS =
"flowGraph.javaPropsExtensions";
+ public static final String HOCON_FILE_EXTENSIONS =
"flowGraph.hoconFileExtensions";
Review Comment:
Same as above
Issue Time Tracking
-------------------
Worklog Id: (was: 810184)
Time Spent: 2h 10m (was: 2h)
> Build a file-based flowgraph that watches for changes and updates
> -----------------------------------------------------------------
>
> Key: GOBBLIN-1696
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1696
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-service
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> Gobblin-as-a-Service only has a Git based flowgraph, which is difficult to
> build CI/CD around. We can provide an alternate flowgraph that is just based
> off files. This flowgraph should update atomically.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)