[ 
https://issues.apache.org/jira/browse/GOBBLIN-2002?focusedWorklogId=905392&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-905392
 ]

ASF GitHub Bot logged work on GOBBLIN-2002:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Feb/24 18:36
            Start Date: 16/Feb/24 18:36
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3878:
URL: https://github.com/apache/gobblin/pull/3878#discussion_r1492808016


##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
+ */
+public class MostlyInMemoryDagManagementStateStoreTest {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    this.dagManagementStateStore = new 
MostlyInMemoryDagManagementStateStore(config, topologySpecMap);
+  }
+
+  @Test
+  public void testAddDag() throws Exception {
+    Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
+    Dag<JobExecutionPlan> dag2 = DagTestUtils.buildDag("test2", 123456L);
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
+    Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+    DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
+    DagNodeId dagNodeId = 
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
+
+    this.dagManagementStateStore.checkpointDag(dag);
+    this.dagManagementStateStore.addDagNodeState(dagNode, dagId);
+    this.dagManagementStateStore.addDagNodeState(dagNode2, dagId);
+    this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2);

Review Comment:
   we did a `checkpointDag` prior to `addDagNodeState` for `dag`, but not for 
`dag2`.
   
   I suspect we should throw an exception if asked to `addDagNodeState` to an 
unknown/uncheckpointed `Dag`.  what's your take?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 905392)
    Time Spent: 5h 20m  (was: 5h 10m)

> create MostlyInMemoryDagManagementStateStore to merge UserQuotaManager, 
> DagStateStore and in-memory dag maps used in DagManager
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2002
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2002
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> this will help in refactoring DagManager



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to