This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b069c1345 [GOBBLIN-1675] Add pagination for GaaS on server side (#3533)
b069c1345 is described below

commit b069c134509d28322b08562d4b69954f2dff99c2
Author: Andy Jiang <[email protected]>
AuthorDate: Thu Aug 18 13:28:53 2022 -0700

    [GOBBLIN-1675] Add pagination for GaaS on server side (#3533)
    
    * added mysql statement and take in pageContext
    
    * Working functionality for count and start in api layer
    
    * Revert unnecessary changes
    
    * Revert unncessary changes
    
    * Add tests for pagination with filterFlow
    
    * Handle start and count for getall call
    
    * Handle start and count for getAll and getFilterFlows call in API layer of 
GaaS
    
    * Add tests and fix for pagination for getAll
    
    * Refactor conditions
    
    * Fixed tests
    
    * Add in condition to separate user defined count and start from default 
values
    
    * Fix checkstyle
    
    * Handle get all with pagination case separately from filter
    
    * Completed ServiceManagerTest for getAll
    
    * remove modified_time field from Get all statement
    
    * Completed GobblinServiceManager tests for filtered flows with pagination
    
    * Remove separate call for tests
    
    * Add context for debug logs and documenation
    
    * Update user.name and user.email on commits
    
    * Add in second ordering field
    
    * Fix testGitCreate test by utilizing json
    
    Co-authored-by: Andy Jiang <[email protected]>
---
 .../apache/gobblin/service/FlowConfigV2Client.java |  32 +++++
 .../service/FlowConfigResourceLocalHandler.java    |   7 +
 .../service/FlowConfigsResourceHandler.java        |   4 +
 .../gobblin/service/FlowConfigsV2Resource.java     |  25 +++-
 .../gobblin/runtime/api/FlowSpecSearchObject.java  |  25 +++-
 .../gobblin/runtime/api/InstrumentedSpecStore.java |   6 +
 .../org/apache/gobblin/runtime/api/SpecStore.java  |   6 +
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  |  17 +++
 .../gobblin/runtime/spec_store/FSSpecStore.java    |   5 +
 .../runtime/spec_store/MysqlBaseSpecStore.java     |  18 +++
 .../runtime/spec_store/MysqlSpecStoreTest.java     |  78 +++++++++++
 .../GobblinServiceFlowConfigResourceHandler.java   |   5 +
 .../gobblin/service/GobblinServiceManagerTest.java | 146 ++++++++++++++++++++-
 gradle/scripts/dependencyDefinitions.gradle        |   4 +-
 14 files changed, 367 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index ed9972c8b..f45ecf807 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -205,6 +205,19 @@ public class FlowConfigV2Client implements Closeable {
     return response.getEntity().getElements();
   }
 
+  /**
+   * Get all {@link FlowConfig}s
+   * @return all {@link FlowConfig}s within the range of start + count - 1, 
inclusive
+   * @throws RemoteInvocationException
+   */
+  public Collection<FlowConfig> getAllFlowConfigs(int start, int count) throws 
RemoteInvocationException {
+    LOG.debug("getAllFlowConfigs with pagination called. Start: {}. Count: 
{}.", start, count);
+
+    GetAllRequest<FlowConfig> getRequest = 
_flowconfigsV2RequestBuilders.getAll().paginate(start, count).build();
+    Response<CollectionResponse<FlowConfig>> response = 
_restClient.get().sendRequest(getRequest).getResponse();
+    return response.getEntity().getElements();
+  }
+
   /**
    * Get all {@link FlowConfig}s that matches the provided parameters. All the 
parameters are optional.
    * If a parameter is null, it is ignored. {@see 
FlowConfigV2Resource#getFilteredFlows}
@@ -224,6 +237,25 @@ public class FlowConfigV2Client implements Closeable {
     return response.getEntity().getElements();
   }
 
+  /**
+   * Get all {@link FlowConfig}s that matches the provided parameters. All the 
parameters are optional.
+   * If a parameter is null, it is ignored. {@see 
FlowConfigV2Resource#getFilteredFlows}
+   */
+  public Collection<FlowConfig> getFlowConfigs(String flowGroup, String 
flowName, String templateUri, String userToProxy,
+      String sourceIdentifier, String destinationIdentifier, String schedule, 
Boolean isRunImmediately, String owningGroup,
+      String propertyFilter, int start, int count) throws 
RemoteInvocationException {
+    LOG.debug("getFilteredFlows pagination called. flowGroup: {}, flowName: 
{}, start: {}, count: {}.", flowGroup, flowName, start, count);
+
+    FindRequest<FlowConfig> getRequest = 
_flowconfigsV2RequestBuilders.findByFilterFlows()
+        
.flowGroupParam(flowGroup).flowNameParam(flowName).templateUriParam(templateUri).userToProxyParam(userToProxy)
+        
.sourceIdentifierParam(sourceIdentifier).destinationIdentifierParam(destinationIdentifier).scheduleParam(schedule)
+        
.isRunImmediatelyParam(isRunImmediately).owningGroupParam(owningGroup).propertyFilterParam(propertyFilter).paginate(start,
 count).build();
+
+    Response<CollectionResponse<FlowConfig>> response = 
_restClient.get().sendRequest(getRequest).getResponse();
+
+    return response.getEntity().getElements();
+  }
+
   /**
    * Delete a flow configuration
    * @param flowId identifier of flow configuration to delete
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 7e91c9d1a..5837359d1 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -112,6 +112,13 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
     return 
flowCatalog.getAllSpecs().stream().map(FlowSpec.Utils::toFlowConfig).collect(Collectors.toList());
   }
 
+  /**
+   * Get all flow configs in between start and start + count - 1
+   */
+  public Collection<FlowConfig> getAllFlowConfigs(int start, int count) {
+    return flowCatalog.getAllSpecs(start, 
count).stream().map(FlowSpec.Utils::toFlowConfig).collect(Collectors.toList());
+  }
+
   /**
    * Add flowConfig locally and trigger all listeners iff @param 
triggerListener is set to true
    */
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
index cc224eb5f..6f6ace9c9 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
@@ -41,6 +41,10 @@ public interface FlowConfigsResourceHandler {
    * Get all {@link FlowConfig}
    */
   Collection<FlowConfig> getAllFlowConfigs();
+  /**
+   * Get all {@link FlowConfig} with pagination
+   */
+  Collection<FlowConfig> getAllFlowConfigs(int start, int count);
 
   /**
    * Add {@link FlowConfig}
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 1aebb7566..a679d768c 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -106,7 +106,12 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
    */
   @Override
   public List<FlowConfig> getAll(@Context PagingContext pagingContext) {
-    return (List) this.getFlowConfigResourceHandler().getAllFlowConfigs();
+    // Check to see if the count and start parameters are user defined or 
default from the framework
+    if (!pagingContext.hasCount() && !pagingContext.hasStart())
+      return (List) this.getFlowConfigResourceHandler().getAllFlowConfigs();
+    else {
+      return (List) 
this.getFlowConfigResourceHandler().getAllFlowConfigs(pagingContext.getStart(), 
pagingContext.getCount());
+    }
   }
 
   /**
@@ -125,9 +130,21 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
       @Optional @QueryParam("isRunImmediately") Boolean isRunImmediately,
       @Optional @QueryParam("owningGroup") String owningGroup,
       @Optional @QueryParam("propertyFilter") String propertyFilter) {
-    FlowSpecSearchObject flowSpecSearchObject = new FlowSpecSearchObject(null, 
flowGroup, flowName,
-        templateUri, userToProxy, sourceIdentifier, destinationIdentifier, 
schedule, null,
-        isRunImmediately, owningGroup, propertyFilter);
+    FlowSpecSearchObject flowSpecSearchObject;
+    // Check to see if the count and start parameters are user defined or 
default from the framework
+    // Start is the index of the first specStore configurations to return
+    // Count is the total number of specStore configurations to return
+    if (!context.hasCount() && !context.hasStart()){
+      flowSpecSearchObject = new FlowSpecSearchObject(null, flowGroup, 
flowName,
+          templateUri, userToProxy, sourceIdentifier, destinationIdentifier, 
schedule, null,
+          isRunImmediately, owningGroup, propertyFilter, -1, -1);
+    }
+    else {
+      flowSpecSearchObject = new FlowSpecSearchObject(null, flowGroup, 
flowName,
+          templateUri, userToProxy, sourceIdentifier, destinationIdentifier, 
schedule, null,
+          isRunImmediately, owningGroup, propertyFilter, context.getStart(), 
context.getCount());
+    }
+
     return (List) 
this.getFlowConfigResourceHandler().getFlowConfig(flowSpecSearchObject);
   }
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
index 644d5f4a9..dc5d6447e 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
@@ -57,6 +57,8 @@ public class FlowSpecSearchObject implements SpecSearchObject 
{
   private final Boolean isRunImmediately;
   private final String owningGroup;
   private final String propertyFilter;
+  private final int start;
+  private final int count;
 
   public static FlowSpecSearchObject fromFlowId(FlowId flowId) {
     return 
FlowSpecSearchObject.builder().flowGroup(flowId.getFlowGroup()).flowName(flowId.getFlowName()).build();
@@ -67,6 +69,7 @@ public class FlowSpecSearchObject implements SpecSearchObject 
{
   public String augmentBaseGetStatement(String baseStatement)
       throws IOException {
     List<String> conditions = new ArrayList<>();
+    List<String> limitAndOffset = new ArrayList<>();
 
     /*
      * IMPORTANT: the order of `conditions` added must align with the order of 
parameter binding later in `completePreparedStatement`!
@@ -116,6 +119,14 @@ public class FlowSpecSearchObject implements 
SpecSearchObject {
       conditions.add("owning_group = ?");
     }
 
+    if (this.getCount() > 0) {
+      // Order by two fields to make a full order by
+      limitAndOffset.add(" ORDER BY modified_time DESC, spec_uri ASC LIMIT ?");
+      if (this.getStart() > 0) {
+        limitAndOffset.add(" OFFSET ?");
+      }
+    }
+
     // If the propertyFilter is myKey=myValue, it looks for a config where key 
is `myKey` and value contains string `myValue`.
     // If the propertyFilter string does not have `=`, it considers the string 
as a key and just looks for its existence.
     // Multiple occurrences of `=` in  propertyFilter are not supported and 
ignored completely.
@@ -136,11 +147,10 @@ public class FlowSpecSearchObject implements 
SpecSearchObject {
       }
     }
 
-    if (conditions.size() == 0) {
+    if (conditions.size() == 0 && limitAndOffset.size() == 0) {
       throw new IOException("At least one condition is required to query flow 
configs.");
     }
-
-    return baseStatement + String.join(" AND ", conditions);
+    return baseStatement + String.join(" AND ", conditions) + String.join(" ", 
limitAndOffset);
   }
 
   @Override
@@ -181,7 +191,7 @@ public class FlowSpecSearchObject implements 
SpecSearchObject {
     }
 
     if (this.getSchedule() != null) {
-      statement.setString(++i, this.getModifiedTimestamp());
+      statement.setString(++i, this.getSchedule());
     }
 
     if (this.getIsRunImmediately() != null) {
@@ -191,5 +201,12 @@ public class FlowSpecSearchObject implements 
SpecSearchObject {
     if (this.getOwningGroup() != null) {
       statement.setString(++i, this.getOwningGroup());
     }
+
+    if (this.getCount() > 0) {
+      statement.setInt(++i, this.getCount());
+      if (this.getStart() > 0) {
+        statement.setInt(++i, this.getStart());
+      }
+    }
   }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index 1f2be8476..dfd643cd4 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -165,6 +165,11 @@ public abstract class InstrumentedSpecStore implements 
SpecStore {
     return this.getURIsWithTagTimer.invokeMayThrowIO(() -> 
getSpecURIsWithTagImpl(tag));
   }
 
+  @Override
+  public Collection<Spec> getSpecs(int start, int count) throws IOException {
+    return this.getTimer.invokeMayThrowIO(() -> getSpecsImpl(start, count));
+  }
+
   @Override
   public int getSize() throws IOException {
     return this.getSizeTimer.invokeMayThrowIO(() -> getSizeImpl());
@@ -179,6 +184,7 @@ public abstract class InstrumentedSpecStore implements 
SpecStore {
   public abstract Iterator<URI> getSpecURIsImpl() throws IOException;
   public abstract Iterator<URI> getSpecURIsWithTagImpl(String tag) throws 
IOException;
   public abstract int getSizeImpl() throws IOException;
+  public abstract Collection<Spec> getSpecsImpl(int start, int count) throws 
IOException;
 
   /** child classes can implement this if they want to get specs using {@link 
SpecSearchObject} */
   public Collection<Spec> getSpecsImpl(SpecSearchObject specUri) throws 
IOException {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
index 259d5c7d0..3642d5a02 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
@@ -116,6 +116,12 @@ public interface SpecStore {
    */
   Collection<Spec> getSpecs() throws IOException;
 
+  /***
+   * Get all {@link Spec}s from the {@link SpecStore} with pagination input.
+   * @throws IOException Exception in retrieving {@link Spec}s.
+   */
+  Collection<Spec> getSpecs(int start, int count) throws IOException;
+
   /**
    * Return an iterator of Spec URIs(Spec identifiers)
    */
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index f459c9588..b269279eb 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -312,6 +312,23 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
     }
   }
 
+  /**
+   * A function to get all specs in the {@link SpecStore} between the provided 
start index and (start + count - 1) index, inclusive.
+   * This enables pagination so getting SpecStore object will not timeout, and 
can be tuned to how many results is desired at any one time.
+   * The {@link Spec} in {@link SpecStore} are sorted in descending order of 
the modified_time while paginating.
+   *
+   * @param start The start index.
+   * @param count The total number of records to get.
+   * @return A collection of specs between start and start + count - 1, 
inclusive.
+   */
+  public Collection<Spec> getAllSpecs(int start, int count) {
+    try {
+      return specStore.getSpecs(start, count);
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot retrieve specs from Spec stores 
between " + start + " and " + (start + count - 1), e);
+    }
+  }
+
   /**
    * A wrapper of getSpecs that handles {@link SpecNotFoundException} properly.
    * This is the most common way to fetch {@link Spec}. For customized way to 
deal with exception, one will
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index 921138fd0..6f3bbe024 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -357,6 +357,11 @@ public class FSSpecStore extends InstrumentedSpecStore {
     return getSizeImpl(this.fsSpecStoreDirPath);
   }
 
+  @Override
+  public Collection<Spec> getSpecsImpl(int start, int count) throws 
UnsupportedOperationException {
+    throw new UnsupportedOperationException();
+  }
+
   private int getSizeImpl(Path directory) throws IOException {
     int specs = 0;
     FileStatus[] fileStatuses = fs.listStatus(directory);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
index c328a2f20..df3d65912 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
@@ -271,6 +271,24 @@ public class MysqlBaseSpecStore extends 
InstrumentedSpecStore {
     });
   }
 
+  @Override
+  public Collection<Spec> getSpecsImpl(int start, int count) throws 
IOException {
+    List<String> limitAndOffset = new ArrayList<>();
+    if (count > 0) {
+      // Order by two fields to make a full order by
+      limitAndOffset.add(" ORDER BY modified_time DESC, spec_uri ASC LIMIT");
+      limitAndOffset.add(String.valueOf(count));
+      if (start > 0) {
+        limitAndOffset.add("OFFSET");
+        limitAndOffset.add(String.valueOf(start));
+      }
+    }
+    String finalizedStatement = this.sqlStatements.getAllStatement + 
String.join(" ", limitAndOffset);
+    return withPreparedStatement(finalizedStatement, statement -> {
+      return retrieveSpecs(statement);
+    });
+  }
+
   @Override
   public Iterator<URI> getSpecURIsWithTagImpl(String tag) throws IOException {
     return 
withPreparedStatement(this.sqlStatements.getAllURIsWithTagStatement, statement 
-> {
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index b7e9df53b..90659d333 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -87,6 +87,7 @@ public class MysqlSpecStoreTest {
         .withConfig(ConfigBuilder.create()
             .addPrimitive("key", "value")
             .addPrimitive("key3", "value3")
+            .addPrimitive("filter.this.flow", true)
             .addPrimitive("config.with.dot", "value4")
             .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
             .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
@@ -98,6 +99,7 @@ public class MysqlSpecStoreTest {
     flowSpec2 = FlowSpec.builder(this.uri2)
         .withConfig(ConfigBuilder.create().addPrimitive("converter", 
"value1,value2,value3")
             .addPrimitive("key3", "value3")
+            .addPrimitive("filter.this.flow", true)
             .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
             .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
             .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg2")
@@ -107,6 +109,7 @@ public class MysqlSpecStoreTest {
         .build();
     flowSpec3 = FlowSpec.builder(this.uri3)
         .withConfig(ConfigBuilder.create().addPrimitive("key3", "value3")
+            .addPrimitive("filter.this.flow", true)
             .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
             .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
             .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg3")
@@ -240,6 +243,81 @@ public class MysqlSpecStoreTest {
     Assert.assertEquals(result.size(), 2);
   }
 
+  @Test (dependsOnMethods = "testGetSpec")
+  public void testGetFilterSpecPaginate() throws Exception {
+    /**
+     * Sorted order of the specStore configurations is flowSpec1, flowSpec2.
+     * flowSpec3 is not included as it is a 'corrupted' flowspec
+     * flowSpec4 is not included as it doesn't have the 'filter.this.flow' 
property
+     * Start is the offset of the first configuration to return
+     * Count is the total number of configurations to return
+     * PropertyFilter is the property to filter by
+     */
+
+    // Start of 0 and count of 1 means start from index 0, and return one 
configuration only
+    FlowSpecSearchObject flowSpecSearchObject = 
FlowSpecSearchObject.builder().start(0).count(1).propertyFilter("filter.this.flow").build();
+    Collection<Spec> specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertFalse(specs.contains(this.flowSpec2));
+    Assert.assertFalse(specs.contains(this.flowSpec4));
+
+    // Start of 1 and count of 1 means start from index 1, and return one 
configuration only
+    flowSpecSearchObject = 
FlowSpecSearchObject.builder().start(1).count(1).propertyFilter("filter.this.flow").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertFalse(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+    Assert.assertFalse(specs.contains(this.flowSpec4));
+
+    /**
+     * Start of 0 and count of 5 means start from index 0, and return five 
configuration only
+     * Total of 3 flowSpecs in the DB, but flowSpec4 doesn't have 
'filter.this.flow' filter so only returns 2 flowSpecs
+     * flowSpec1 and flowSpec2 match all the criteria
+     */
+    flowSpecSearchObject = 
FlowSpecSearchObject.builder().start(0).count(5).propertyFilter("filter.this.flow").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 2);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+    Assert.assertFalse(specs.contains(this.flowSpec4));
+  }
+
+  @Test (dependsOnMethods =  "testGetSpec")
+  public void testGetAllSpecPaginate() throws Exception {
+    /**
+     * Sorted order of the specStore configurations is flowSpec1, flowSpec2, 
flowSpec4
+     */
+    // Return all flowSpecs from index 0 to 9. Total of 3 flowSpecs only so 
return all 3 flowSpecs
+    Collection<Spec> specs = this.specStore.getSpecs(0,10);
+    Assert.assertEquals(specs.size(), 3);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+    Assert.assertTrue(specs.contains(this.flowSpec4));
+
+    // Return all flowSpecs using the default get all specs function. Testing 
default functionality of returning everything
+    specs = this.specStore.getSpecs();
+    Assert.assertEquals(specs.size(), 3);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+    Assert.assertTrue(specs.contains(this.flowSpec4));
+
+    // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only 
return first two.
+    specs = this.specStore.getSpecs(0,2);
+    Assert.assertEquals(specs.size(), 2);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+    Assert.assertFalse(specs.contains(this.flowSpec4));
+
+    // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only 
return first two.
+    // Check that functionality for not including a start value is the same as 
including start value of 0
+    specs = this.specStore.getSpecs(-1, 2);
+    Assert.assertEquals(specs.size(), 2);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+    Assert.assertFalse(specs.contains(this.flowSpec4));
+  }
+
   @Test (expectedExceptions = {IOException.class})
   public void testGetCorruptedSpec() throws Exception {
     this.specStore.addSpec(this.flowSpec3);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index f1b6e0acf..44890a21d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -99,6 +99,11 @@ public class GobblinServiceFlowConfigResourceHandler 
implements FlowConfigsResou
     return this.localHandler.getAllFlowConfigs();
   }
 
+  @Override
+  public Collection<FlowConfig> getAllFlowConfigs(int start, int count) {
+    return this.localHandler.getAllFlowConfigs(start, count);
+  }
+
   /**
    * Adding {@link FlowConfig} should check if current node is active (master).
    * If current node is active, call {@link 
FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)} directly.
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 13e607d19..baac020fe 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service;
 
 import java.io.File;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -86,8 +87,18 @@ public class GobblinServiceManagerTest {
   private static final String TEST_GROUP_NAME = "testGroup";
   private static final String TEST_FLOW_NAME = "testFlow";
   private static final String TEST_FLOW_NAME2 = "testFlow2";
+  private static final String TEST_FLOW_NAME3 = "testFlow3";
+  private static final String TEST_FLOW_NAME4 = "testFlow4";
+  private static final String TEST_FLOW_NAME5 = "testFlow5";
+  private static final String TEST_FLOW_NAME6 = "testFlow6";
+  private static final String TEST_FLOW_NAME7 = "testFlow7";
   private static final FlowId TEST_FLOW_ID = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
   private static final FlowId TEST_FLOW_ID2 = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME2);
+  private static final FlowId TEST_FLOW_ID3 = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME3);
+  private static final FlowId TEST_FLOW_ID4 = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME4);
+  private static final FlowId TEST_FLOW_ID5 = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME5);
+  private static final FlowId TEST_FLOW_ID6 = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME6);
+  private static final FlowId TEST_FLOW_ID7 = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME7);
   private static final FlowId UNCOMPILABLE_FLOW_ID = new 
FlowId().setFlowGroup(TEST_GROUP_NAME)
       .setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW);
 
@@ -142,6 +153,10 @@ public class GobblinServiceManagerTest {
 
     serviceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, 
TOPOLOGY_SPEC_STORE_DIR);
     serviceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, 
FLOW_SPEC_STORE_DIR);
+    serviceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_CLASS_KEY, 
"org.apache.gobblin.runtime.spec_store.MysqlSpecStore");
+    serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, 
"flow_spec_store");
+    serviceCoreProperties.put(FlowCatalog.FLOWSPEC_SERDE_CLASS_KEY, 
"org.apache.gobblin.runtime.spec_serde.GsonFlowSpecSerDe");
+
     
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY,
 TEST_GOBBLIN_EXECUTOR_NAME);
     serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  
TEST_GOBBLIN_EXECUTOR_NAME + ".description",
         "StandaloneTestExecutor");
@@ -469,7 +484,7 @@ null, null, null, null);
     File testFlowFile = new File(GIT_CLONE_DIR + 
"/gobblin-config/testGroup/testFlow.pull");
     testFlowFile.getParentFile().mkdirs();
 
-    Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value20\n", 
testFlowFile, Charsets.UTF_8);
+    
Files.write("{\"id\":{\"flowName\":\"testFlow\",\"flowGroup\":\"testGroup\"},\"param1\":\"value20\"}",
 testFlowFile, Charsets.UTF_8);
 
     Collection<Spec> specs = 
this.gobblinServiceManager.getFlowCatalog().getSpecs();
     Assert.assertEquals(specs.size(), 0);
@@ -544,4 +559,133 @@ null, null, null, null);
     this.flowConfigClient = new 
FlowConfigV2Client(String.format("http://127.0.0.1:%s/";,
         this.gobblinServiceManager.getRestLiServerListeningURI().getPort()), 
transportClientProperties);
   }
+
+  @Test (dependsOnMethods = "testGitCreate")
+  public void testGetAllPaginated() throws Exception {
+    // Order of the flows by descending modified_time, and ascending flow.name 
should be: testFlow, testFlow2, testFlow3, testFlow4
+    FlowConfig flowConfig1 = new FlowConfig().setId(TEST_FLOW_ID)
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+    this.flowConfigClient.createFlowConfig(flowConfig1);
+
+    FlowConfig flowConfig2 = new FlowConfig().setId(TEST_FLOW_ID2)
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+    this.flowConfigClient.createFlowConfig(flowConfig2);
+
+    FlowConfig flowConfig3 = new FlowConfig().setId(TEST_FLOW_ID3)
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+    this.flowConfigClient.createFlowConfig(flowConfig3);
+
+    FlowConfig flowConfig4 = new FlowConfig().setId(TEST_FLOW_ID4)
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+    this.flowConfigClient.createFlowConfig(flowConfig4);
+
+    // Check that there are a total of 4 flowConfigs by using the default 
getAll call
+    Collection<FlowConfig> flowConfigs = 
this.flowConfigClient.getAllFlowConfigs();
+    Assert.assertEquals(flowConfigs.size(), 4);
+
+    // Check that there are a total of 4 flowConfigs using new getAll call
+    flowConfigs = this.flowConfigClient.getAllFlowConfigs(0,20);
+    Assert.assertEquals(flowConfigs.size(), 4);
+
+    // Attempt pagination with one element from the start of the specStore 
configurations stored
+    // Start at index 0 and return 1 element
+    flowConfigs = this.flowConfigClient.getAllFlowConfigs(0,1);
+    Assert.assertEquals(flowConfigs.size(), 1);
+    
Assert.assertEquals(((FlowConfig)(flowConfigs.toArray()[0])).getId().getFlowName(),
 "testFlow");
+
+    // Attempt pagination with one element from the specStore configurations 
stored with offset of 1
+    // Start at index 1 and return 1 element
+    flowConfigs = this.flowConfigClient.getAllFlowConfigs(1,1);
+    Assert.assertEquals(flowConfigs.size(), 1);
+    
Assert.assertEquals(((FlowConfig)(flowConfigs.toArray()[0])).getId().getFlowName(),
 "testFlow2");
+
+    // Attempt pagination with one element from the specStore configurations 
stored with offset of 2
+    // Start at index 2 and return 1 element
+    flowConfigs = this.flowConfigClient.getAllFlowConfigs(2,1);
+    Assert.assertEquals(flowConfigs.size(), 1);
+    
Assert.assertEquals(((FlowConfig)(flowConfigs.toArray()[0])).getId().getFlowName(),
 "testFlow3");
+
+    // Attempt pagination with one element from the specStore configurations 
stored with offset of 3
+    // Start at index 2 and return 1 element
+    flowConfigs = this.flowConfigClient.getAllFlowConfigs(3,1);
+    Assert.assertEquals(flowConfigs.size(), 1);
+    
Assert.assertEquals(((FlowConfig)(flowConfigs.toArray()[0])).getId().getFlowName(),
 "testFlow4");
+
+    // Attempt pagination with 20 element from the specStore configurations 
stored with offset of 1
+    // Start at index 1 and return 20 elements if there exists 20 elements.
+    // But only 4 total elements, return 3 elements since offset by 1
+    flowConfigs = this.flowConfigClient.getAllFlowConfigs(1,20);
+    Assert.assertEquals(flowConfigs.size(), 3);
+    List flowNameArray = new ArrayList();
+    List expectedResults = new ArrayList();
+    expectedResults.add("testFlow2");
+    expectedResults.add("testFlow3");
+    expectedResults.add("testFlow4");
+    for (FlowConfig fc : flowConfigs) {
+      flowNameArray.add(fc.getId().getFlowName());
+    }
+    Assert.assertEquals(flowNameArray, expectedResults);
+
+    // Clean up the flowConfigs added in for the pagination tests
+    this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID);
+    this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID2);
+    this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID3);
+    this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID4);
+  }
+
+  @Test (dependsOnMethods = "testGitCreate")
+  public void testGetFilteredFlowsPaginated() throws Exception {
+    // Attempt pagination with one element from the start of the specStore 
configurations stored. Filter by the owningGroup of "Keep.this"
+    FlowConfig flowConfig2 = new 
FlowConfig().setId(TEST_FLOW_ID5).setOwningGroup("Filter.this")
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+    this.flowConfigClient.createFlowConfig(flowConfig2);
+
+    FlowConfig flowConfig3 = new 
FlowConfig().setId(TEST_FLOW_ID6).setOwningGroup("Keep.this")
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+    this.flowConfigClient.createFlowConfig(flowConfig3);
+
+    FlowConfig flowConfig4 = new 
FlowConfig().setId(TEST_FLOW_ID7).setOwningGroup("Keep.this")
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+    this.flowConfigClient.createFlowConfig(flowConfig4);
+
+    // Start at index 0 and return 1 element
+    Collection<FlowConfig> flowConfigs = 
this.flowConfigClient.getFlowConfigs(null, null, null, null, null, null,
+        TEST_SCHEDULE, null, "Keep.this", null, 0, 1);
+    Assert.assertEquals(flowConfigs.size(), 1);
+    
Assert.assertEquals(((FlowConfig)(flowConfigs.toArray()[0])).getId().getFlowName(),
 "testFlow6");
+
+    // Attempt pagination with one element from the start of the specStore 
configurations stored. Filter by the owningGroup of "Keep.this"
+    // Start at index 1 and return 1 element
+    flowConfigs = this.flowConfigClient.getFlowConfigs(null, null, null, null, 
null, null,
+        TEST_SCHEDULE, null, "Keep.this", null, 1, 1);
+    Assert.assertEquals(flowConfigs.size(), 1);
+    
Assert.assertEquals(((FlowConfig)(flowConfigs.toArray()[0])).getId().getFlowName(),
 "testFlow7");
+
+    // Attempt pagination with one element from the start of the specStore 
configurations stored. Filter by the owningGroup of "Keep.this"
+    // Start at index 0 and return 20 element if exists. In this case, only 2 
items so return all two items
+    flowConfigs = this.flowConfigClient.getFlowConfigs(null, null, null, null, 
null, null,
+        TEST_SCHEDULE, null, "Keep.this", null, 0, 20);
+    Assert.assertEquals(flowConfigs.size(), 2);
+
+    List flowNameArray = new ArrayList();
+    List expectedResults = new ArrayList();
+    expectedResults.add("testFlow6");
+    expectedResults.add("testFlow7");
+    for (FlowConfig fc : flowConfigs) {
+      flowNameArray.add(fc.getId().getFlowName());
+    }
+    Assert.assertEquals(flowNameArray, expectedResults);
+
+    // Clean up the flowConfigs added in for the pagination tests
+    this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID5);
+    this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID6);
+    this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID7);
+  }
 }
\ No newline at end of file
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index e988e6e25..3bda269ab 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -207,8 +207,8 @@ ext.externalDependency = [
         "org.slf4j:slf4j-log4j12:" + slf4jVersion
     ],
     "postgresConnector": "org.postgresql:postgresql:42.1.4",
-    "testContainers": "org.testcontainers:testcontainers:1.15.3",
-    "testContainersMysql": "org.testcontainers:mysql:1.15.3"
+    "testContainers": "org.testcontainers:testcontainers:1.17.3",
+    "testContainersMysql": "org.testcontainers:mysql:1.17.3"
 ]
 
 if (!isDefaultEnvironment)

Reply via email to