[
https://issues.apache.org/jira/browse/GOBBLIN-2115?focusedWorklogId=926550&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-926550
]
ASF GitHub Bot logged work on GOBBLIN-2115:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 18/Jul/24 20:08
Start Date: 18/Jul/24 20:08
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3999:
URL: https://github.com/apache/gobblin/pull/3999#discussion_r1683416757
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -116,128 +112,78 @@ public void removeFlowSpec(URI uri, Properties headers,
boolean triggerListener)
this.flowCatalog.remove(uri, headers, triggerListener);
}
- public synchronized void setTopologySpecMap(Map<URI, TopologySpec>
topologySpecMap) throws IOException {
+ public synchronized void setTopologySpecMap(Map<URI, TopologySpec>
topologySpecMap) {
this.topologySpecMap = topologySpecMap;
start();
}
- private DagStateStore createDagStateStore(Config config, Map<URI,
TopologySpec> topologySpecMap) {
+ private DagStateStoreWithDagNodes createDagStateStore(Config config,
Map<URI, TopologySpec> topologySpecMap) {
try {
- Class<?> dagStateStoreClass =
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStore.class.getName()));
- return (DagStateStore)
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config,
topologySpecMap);
+ Class<?> dagStateStoreClass =
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStoreWithDagNodes.class.getName()));
+ return (DagStateStoreWithDagNodes)
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config,
topologySpecMap);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
@Override
- public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException {
Review Comment:
I think this makes sense primarily if the dag is immutable (i.e. you are
certain when calling this line that you are creating a new row and adding a dag
that already exists may result in an error) otherwise checkpoint may still make
sense here semantically.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -66,25 +65,11 @@ public interface DagManagementStateStore {
*/
void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
- /**
- @return whether `dagId` is currently known due to {@link
DagManagementStateStore#checkpointDag} but not yet
- {@link DagManagementStateStore#deleteDag}
- */
- boolean containsDag(DagManager.DagId dagId) throws IOException;
-
/**
@return the {@link Dag}, if present
*/
Optional<Dag<JobExecutionPlan>> getDag(DagManager.DagId dagId) throws
IOException;
- /**
- * Delete the {@link Dag} from the backing store, typically called upon
completion of execution.
- * @param dag The dag completed/cancelled execution on {@link
org.apache.gobblin.runtime.api.SpecExecutor}.
- */
- default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
- deleteDag(DagManagerUtils.generateDagId(dag));
- }
-
Review Comment:
IMO it comes down to how the callers perform work. Do they only store IDs
and this implementation of the state store does all the mutation and
modification of the dags? If so, then we should have a consistent API that
takes in DagIds as the function parameters. If the callers themselves are
modifying dags, and this class is only used as a storage mechanism, may make
sense to just accept a full dag as the parameter for better encapsulation.
Issue Time Tracking
-------------------
Worklog Id: (was: 926550)
Time Spent: 2h (was: 1h 50m)
> implement fully mysql based DMSS
> --------------------------------
>
> Key: GOBBLIN-2115
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2115
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)