phet commented on code in PR #4041:
URL: https://github.com/apache/gobblin/pull/4041#discussion_r1750642724
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json:
##########
@@ -264,11 +264,14 @@
"deprecated" : "Use FlowExecution instead"
}, "org.apache.gobblin.service.FlowStatusId",
"org.apache.gobblin.service.Issue", "org.apache.gobblin.service.IssueSeverity",
"org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState",
"org.apache.gobblin.service.JobStatistics",
"org.apache.gobblin.service.JobStatus", "org.apache.gobblin.service.Timestamp"
],
"schema" : {
+ "annotations" : {
+ "deprecated" : { }
+ },
"name" : "flowstatuses",
"namespace" : "org.apache.gobblin.service",
"path" : "/flowstatuses",
"schema" : "org.apache.gobblin.service.FlowStatus",
- "doc" : "Resource for handling flow status requests\n\ngenerated from:
org.apache.gobblin.service.FlowStatusResource",
+ "doc" : "Resource for handling flow status requests. Deprecated, use
{@link FlowExecutionResource}\n\ngenerated from:
org.apache.gobblin.service.FlowStatusResource",
Review Comment:
same advice here
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowExecutionResourceHandler.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.restli;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.google.inject.Inject;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.RestLiServiceException;
+import com.linkedin.restli.server.UpdateResponse;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.service.FlowExecution;
+import org.apache.gobblin.service.FlowExecutionResource;
+import org.apache.gobblin.service.FlowExecutionResourceHandlerInterface;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.FlowStatusId;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.monitoring.FlowStatus;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+
+
+@Slf4j
+public class FlowExecutionResourceHandler implements
FlowExecutionResourceHandlerInterface {
Review Comment:
javadoc?
also, the diff isn't showing this as a rename, but is that what it is?
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json:
##########
@@ -1,9 +1,12 @@
{
+ "annotations" : {
+ "deprecated" : { }
+ },
"name" : "flowstatuses",
"namespace" : "org.apache.gobblin.service",
"path" : "/flowstatuses",
"schema" : "org.apache.gobblin.service.FlowStatus",
- "doc" : "Resource for handling flow status requests\n\ngenerated from:
org.apache.gobblin.service.FlowStatusResource",
+ "doc" : "Resource for handling flow status requests. Deprecated, use {@link
FlowExecutionResource}\n\ngenerated from:
org.apache.gobblin.service.FlowStatusResource",
Review Comment:
suggest to announce deprecation first:
```
Deprecated, use {@link FlowExecutionResource}\n\nResource for handling flow
status requests\n\ngenerated from:
org.apache.gobblin.service.FlowStatusResource",
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -289,7 +280,7 @@ private void registerServicesInLauncher(){
private void configureServices(){
if (configuration.isRestLIServerEnabled()) {
this.restliServer = EmbeddedRestliServer.builder()
- .resources(Lists.newArrayList(FlowConfigsResource.class,
FlowConfigsV2Resource.class))
+ .resources(Lists.newArrayList(FlowConfigsV2Resource.class,
FlowConfigsV2Resource.class))
Review Comment:
can we list just once?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/FlowExecutionTest.java:
##########
@@ -30,29 +30,28 @@
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
-import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
-import com.google.inject.Module;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.restli.FlowExecutionResourceHandler;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import static org.mockito.Mockito.mock;
@Test(groups = { "gobblin.service" }, singleThreaded = true)
-public class FlowStatusTest {
- private FlowStatusClient _client;
+public class FlowExecutionTest {
Review Comment:
`FlowExecutionClientSideTest`?
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -60,14 +63,22 @@
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.FlowConfig;
+import org.apache.gobblin.service.FlowConfigLoggedException;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.FlowStatusId;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.Schedule;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
-/**
- * A {@link FlowConfigsResourceHandler} that handles Restli locally.
- */
@Slf4j
-public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandler {
+public class FlowConfigsV2ResourceHandler {
Review Comment:
why not implement a `FlowConfigsV2ResourceHandlerInterface`, like
flowExecutions does?
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java:
##########
@@ -29,6 +29,7 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler;
public class FlowConfigResourceLocalHandlerTest {
Review Comment:
shouldn't this be renamed to V2? also, hasn't the "resource local handler"
naming convention also gone away?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagFlowTest.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.TimeUnit;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Tests the state updates (including updating in-memory state and
MysqlDagActionStore) after performing add or cancel
+ * operations by calling addDag, stopDag, kill, and resume. It also tests
flows with and without sla configs.
+ */
+public class DagFlowTest {
+
+ @Test
+ void slaConfigCheck() throws Exception {
+ Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("5", 123456783L,
"FINISH_RUNNING", 1);
+
Assert.assertEquals(DagUtils.getFlowFinishDeadline(dag.getStartNodes().get(0)),
ServiceConfigKeys.DEFAULT_FLOW_FINISH_DEADLINE_MILLIS);
+
+ Config jobConfig =
dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+ jobConfig = jobConfig
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef("7"))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name()));
+ dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
+
Assert.assertEquals(DagUtils.getFlowFinishDeadline(dag.getStartNodes().get(0)),
TimeUnit.SECONDS.toMillis(7L));
+
+ jobConfig = jobConfig
Review Comment:
can we use a fresh name, like `jobConfig2`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -114,7 +114,6 @@ Metrics need to be created before initializeMonitor() below
is called (or more s
protected void assignTopicPartitions() {
// Expects underlying consumer to handle initializing partitions and
offset for the topic -
// subscribe to all partitions from latest offset
Review Comment:
I'm a bit fuzzy on this (and wondering whether it should be an `abstract`
method).
if the empty but concrete impl is actually what we want, is it possible to
name the specific method involved in the "underlying consumer"?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagFlowTest.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.TimeUnit;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Tests the state updates (including updating in-memory state and
MysqlDagActionStore) after performing add or cancel
+ * operations by calling addDag, stopDag, kill, and resume. It also tests
flows with and without sla configs.
+ */
+public class DagFlowTest {
+
+ @Test
+ void slaConfigCheck() throws Exception {
Review Comment:
this javadoc talks about a lot, but it seems mostly like just testing
`DagUtils.getFlowFinishDeadline`. is there more than that, which I'm not
grasping?
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandlerTest.java:
##########
@@ -17,50 +17,49 @@
package org.apache.gobblin.service;
-
import org.testng.Assert;
import org.testng.annotations.Test;
public class FlowExecutionResourceLocalHandlerTest {
Review Comment:
let's rename, since the `FlowExecutionResourceLocalHandler` looks to have
been deleted
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]