AndyJiang99 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1105183287


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java:
##########
@@ -358,8 +360,35 @@ public int getSizeImpl() throws IOException {
   }
 
   @Override
-  public Collection<Spec> getSpecsImpl(int start, int count) throws 
UnsupportedOperationException {
-    throw new UnsupportedOperationException();
+  public Collection<Spec> getSpecsPaginatedImpl(int startOffset, int batchSize)
+      throws IOException {
+    if (startOffset < 0 || batchSize < 0) {
+      throw new IOException(String.format("Received negative offset or batch 
size value when they should be >= 0. "

Review Comment:
   Maybe IllegalArgumentException instead? 



##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java:
##########
@@ -303,19 +303,18 @@ public void testGetAllSpecPaginate() throws Exception {
     Assert.assertTrue(specs.contains(this.flowSpec4));
 
     // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only 
return first two.
-    specs = this.specStore.getSpecs(0,2);
+    specs = this.specStore.getSpecsPaginated(0,2);
     Assert.assertEquals(specs.size(), 2);
     Assert.assertTrue(specs.contains(this.flowSpec1));
     Assert.assertTrue(specs.contains(this.flowSpec2));
     Assert.assertFalse(specs.contains(this.flowSpec4));
 
-    // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only 
return first two.
-    // Check that functionality for not including a start value is the same as 
including start value of 0
-    specs = this.specStore.getSpecs(-1, 2);
+    // Return all flowSpecs from index 1 to 3 - 1. Total of 2 flowSpecs, only 
return second two.
+    specs = this.specStore.getSpecsPaginated(1, 2);

Review Comment:
   Same comment as below, can we keep the -1 input for either OFFSET or COUNT 
just to check that the functionality to handle such edge cases exists



##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java:
##########
@@ -325,19 +325,18 @@ public void testGetAllSpecPaginate() throws Exception {
     Assert.assertTrue(specs.contains(this.flowSpec4));
 
     // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return 
first two.
-    specs = this.specStore.getSpecs(0,2);
+    specs = this.specStore.getSpecsPaginated(0,2);
     Assert.assertEquals(specs.size(), 2);
     Assert.assertTrue(specs.contains(this.flowSpec1));
     Assert.assertTrue(specs.contains(this.flowSpec2));
     Assert.assertFalse(specs.contains(this.flowSpec4));
 
-    // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return 
first two.
-    // Check that functionality for not including a start value is the same as 
including start value of 0
-    specs = this.specStore.getSpecs(-1, 2);
+    // Return all flowSpecs from index 1 to 3 - 1. Total of 2 flowSpecs, only 
return second two.
+    specs = this.specStore.getSpecsPaginated(1, 2);

Review Comment:
   Could we keep the test where the COUNT and/or OFFSET is negative, and Assert 
that we're able to catch an error? The reason as to why this was here is to 
catch this corner case in the case that implementation changes 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -133,10 +147,22 @@ public 
GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
     this.helixManager = helixManager;
     this.orchestrator = orchestrator;
     this.scheduledFlowSpecs = Maps.newHashMap();
+    this.loadSpecsBatchSize = 
Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.LOAD_SPEC_BATCH_SIZE,
 String.valueOf(ConfigurationKeys.DEFAULT_LOAD_SPEC_BATCH_SIZE)));
     this.isNominatedDRHandler = 
config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
         && config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
     this.warmStandbyEnabled = warmStandbyEnabled;
     this.quotaManager = quotaManager;
+    // Check that these metrics do not exist before adding, mainly for testing 
purpose which creates multiple instances
+    // of the scheduler. If one metric exists, then the others should as well.
+    MetricFilter filter = 
MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS);
+    if (metricContext.getGauges(filter).isEmpty()) {
+//      this.averageGetSpecTimeMillis =

Review Comment:
   Remove unnecessary comments here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to