[
https://issues.apache.org/jira/browse/GOBBLIN-1783?focusedWorklogId=845254&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-845254
]
ASF GitHub Bot logged work on GOBBLIN-1783:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 14/Feb/23 01:07
Start Date: 14/Feb/23 01:07
Worklog Time Spent: 10m
Work Description: AndyJiang99 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1105183287
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java:
##########
@@ -358,8 +360,35 @@ public int getSizeImpl() throws IOException {
}
@Override
- public Collection<Spec> getSpecsImpl(int start, int count) throws
UnsupportedOperationException {
- throw new UnsupportedOperationException();
+ public Collection<Spec> getSpecsPaginatedImpl(int startOffset, int batchSize)
+ throws IOException {
+ if (startOffset < 0 || batchSize < 0) {
+ throw new IOException(String.format("Received negative offset or batch
size value when they should be >= 0. "
Review Comment:
Maybe IllegalArgumentException instead?
##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java:
##########
@@ -303,19 +303,18 @@ public void testGetAllSpecPaginate() throws Exception {
Assert.assertTrue(specs.contains(this.flowSpec4));
// Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only
return first two.
- specs = this.specStore.getSpecs(0,2);
+ specs = this.specStore.getSpecsPaginated(0,2);
Assert.assertEquals(specs.size(), 2);
Assert.assertTrue(specs.contains(this.flowSpec1));
Assert.assertTrue(specs.contains(this.flowSpec2));
Assert.assertFalse(specs.contains(this.flowSpec4));
- // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only
return first two.
- // Check that functionality for not including a start value is the same as
including start value of 0
- specs = this.specStore.getSpecs(-1, 2);
+ // Return all flowSpecs from index 1 to 3 - 1. Total of 2 flowSpecs, only
return second two.
+ specs = this.specStore.getSpecsPaginated(1, 2);
Review Comment:
Same comment as below, can we keep the -1 input for either OFFSET or COUNT
just to check that the functionality to handle such edge cases exists
##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java:
##########
@@ -325,19 +325,18 @@ public void testGetAllSpecPaginate() throws Exception {
Assert.assertTrue(specs.contains(this.flowSpec4));
// Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return
first two.
- specs = this.specStore.getSpecs(0,2);
+ specs = this.specStore.getSpecsPaginated(0,2);
Assert.assertEquals(specs.size(), 2);
Assert.assertTrue(specs.contains(this.flowSpec1));
Assert.assertTrue(specs.contains(this.flowSpec2));
Assert.assertFalse(specs.contains(this.flowSpec4));
- // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return
first two.
- // Check that functionality for not including a start value is the same as
including start value of 0
- specs = this.specStore.getSpecs(-1, 2);
+ // Return all flowSpecs from index 1 to 3 - 1. Total of 2 flowSpecs, only
return second two.
+ specs = this.specStore.getSpecsPaginated(1, 2);
Review Comment:
Could we keep the test where the COUNT and/or OFFSET is negative, and Assert
that we're able to catch an error? The reason as to why this was here is to
catch this corner case in the case that implementation changes
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -133,10 +147,22 @@ public
GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
this.helixManager = helixManager;
this.orchestrator = orchestrator;
this.scheduledFlowSpecs = Maps.newHashMap();
+ this.loadSpecsBatchSize =
Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.LOAD_SPEC_BATCH_SIZE,
String.valueOf(ConfigurationKeys.DEFAULT_LOAD_SPEC_BATCH_SIZE)));
this.isNominatedDRHandler =
config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
&& config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
this.warmStandbyEnabled = warmStandbyEnabled;
this.quotaManager = quotaManager;
+ // Check that these metrics do not exist before adding, mainly for testing
purpose which creates multiple instances
+ // of the scheduler. If one metric exists, then the others should as well.
+ MetricFilter filter =
MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS);
+ if (metricContext.getGauges(filter).isEmpty()) {
+// this.averageGetSpecTimeMillis =
Review Comment:
Remove unnecessary comments here
Issue Time Tracking
-------------------
Worklog Id: (was: 845254)
Time Spent: 1.5h (was: 1h 20m)
> Initialize scheduler with batch gets instead of individual get per flow
> -----------------------------------------------------------------------
>
> Key: GOBBLIN-1783
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1783
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> We seek to improve initialization time of the JobScheduler upon restart or
> new leadership change by batching the mysql queries to get flow specs.
> Instead of making 1 mysql get call for each flow execution id, which scales
> extremely poorly with number of flows, we should group them to reduce number
> of calls and downtime.
> This implementation adds two new functions to the SpecStore interface,
> getSortedSpecURIs and getBatchedSpecs, that we use to achieve the batching.
> Because these two functionalities are generic enough to be used in derived
> classes of the SpecStore we add them to the base class. Although this
> requires any child classes to implement these functions, it allows any
> consumer of the parent class SpecStore to use this functionality without
> caring about the specific implementation of the SpecStore used (as
> JobScheduler does). Additionally, the getBatchedSpecs requires an offset or
> starting point to obtain the batches from so the consumer has to do some book
> keeping of where in the paginated gets we are but this again separates the
> functionality from the use case of the consumer. the entirety of the flow
> catalog is too large to load into memory for the Scheduler, so we use this
> batch functionality.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)