This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b069c1345 [GOBBLIN-1675] Add pagination for GaaS on server side (#3533)
b069c1345 is described below
commit b069c134509d28322b08562d4b69954f2dff99c2
Author: Andy Jiang <[email protected]>
AuthorDate: Thu Aug 18 13:28:53 2022 -0700
[GOBBLIN-1675] Add pagination for GaaS on server side (#3533)
* added mysql statement and take in pageContext
* Working functionality for count and start in api layer
* Revert unnecessary changes
* Revert unncessary changes
* Add tests for pagination with filterFlow
* Handle start and count for getall call
* Handle start and count for getAll and getFilterFlows call in API layer of
GaaS
* Add tests and fix for pagination for getAll
* Refactor conditions
* Fixed tests
* Add in condition to separate user defined count and start from default
values
* Fix checkstyle
* Handle get all with pagination case separately from filter
* Completed ServiceManagerTest for getAll
* remove modified_time field from Get all statement
* Completed GobblinServiceManager tests for filtered flows with pagination
* Remove separate call for tests
* Add context for debug logs and documenation
* Update user.name and user.email on commits
* Add in second ordering field
* Fix testGitCreate test by utilizing json
Co-authored-by: Andy Jiang <[email protected]>
---
.../apache/gobblin/service/FlowConfigV2Client.java | 32 +++++
.../service/FlowConfigResourceLocalHandler.java | 7 +
.../service/FlowConfigsResourceHandler.java | 4 +
.../gobblin/service/FlowConfigsV2Resource.java | 25 +++-
.../gobblin/runtime/api/FlowSpecSearchObject.java | 25 +++-
.../gobblin/runtime/api/InstrumentedSpecStore.java | 6 +
.../org/apache/gobblin/runtime/api/SpecStore.java | 6 +
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 17 +++
.../gobblin/runtime/spec_store/FSSpecStore.java | 5 +
.../runtime/spec_store/MysqlBaseSpecStore.java | 18 +++
.../runtime/spec_store/MysqlSpecStoreTest.java | 78 +++++++++++
.../GobblinServiceFlowConfigResourceHandler.java | 5 +
.../gobblin/service/GobblinServiceManagerTest.java | 146 ++++++++++++++++++++-
gradle/scripts/dependencyDefinitions.gradle | 4 +-
14 files changed, 367 insertions(+), 11 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index ed9972c8b..f45ecf807 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -205,6 +205,19 @@ public class FlowConfigV2Client implements Closeable {
return response.getEntity().getElements();
}
+ /**
+ * Get all {@link FlowConfig}s
+ * @return all {@link FlowConfig}s within the range of start + count - 1,
inclusive
+ * @throws RemoteInvocationException
+ */
+ public Collection<FlowConfig> getAllFlowConfigs(int start, int count) throws
RemoteInvocationException {
+ LOG.debug("getAllFlowConfigs with pagination called. Start: {}. Count:
{}.", start, count);
+
+ GetAllRequest<FlowConfig> getRequest =
_flowconfigsV2RequestBuilders.getAll().paginate(start, count).build();
+ Response<CollectionResponse<FlowConfig>> response =
_restClient.get().sendRequest(getRequest).getResponse();
+ return response.getEntity().getElements();
+ }
+
/**
* Get all {@link FlowConfig}s that matches the provided parameters. All the
parameters are optional.
* If a parameter is null, it is ignored. {@see
FlowConfigV2Resource#getFilteredFlows}
@@ -224,6 +237,25 @@ public class FlowConfigV2Client implements Closeable {
return response.getEntity().getElements();
}
+ /**
+ * Get all {@link FlowConfig}s that matches the provided parameters. All the
parameters are optional.
+ * If a parameter is null, it is ignored. {@see
FlowConfigV2Resource#getFilteredFlows}
+ */
+ public Collection<FlowConfig> getFlowConfigs(String flowGroup, String
flowName, String templateUri, String userToProxy,
+ String sourceIdentifier, String destinationIdentifier, String schedule,
Boolean isRunImmediately, String owningGroup,
+ String propertyFilter, int start, int count) throws
RemoteInvocationException {
+ LOG.debug("getFilteredFlows pagination called. flowGroup: {}, flowName:
{}, start: {}, count: {}.", flowGroup, flowName, start, count);
+
+ FindRequest<FlowConfig> getRequest =
_flowconfigsV2RequestBuilders.findByFilterFlows()
+
.flowGroupParam(flowGroup).flowNameParam(flowName).templateUriParam(templateUri).userToProxyParam(userToProxy)
+
.sourceIdentifierParam(sourceIdentifier).destinationIdentifierParam(destinationIdentifier).scheduleParam(schedule)
+
.isRunImmediatelyParam(isRunImmediately).owningGroupParam(owningGroup).propertyFilterParam(propertyFilter).paginate(start,
count).build();
+
+ Response<CollectionResponse<FlowConfig>> response =
_restClient.get().sendRequest(getRequest).getResponse();
+
+ return response.getEntity().getElements();
+ }
+
/**
* Delete a flow configuration
* @param flowId identifier of flow configuration to delete
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 7e91c9d1a..5837359d1 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -112,6 +112,13 @@ public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandle
return
flowCatalog.getAllSpecs().stream().map(FlowSpec.Utils::toFlowConfig).collect(Collectors.toList());
}
+ /**
+ * Get all flow configs in between start and start + count - 1
+ */
+ public Collection<FlowConfig> getAllFlowConfigs(int start, int count) {
+ return flowCatalog.getAllSpecs(start,
count).stream().map(FlowSpec.Utils::toFlowConfig).collect(Collectors.toList());
+ }
+
/**
* Add flowConfig locally and trigger all listeners iff @param
triggerListener is set to true
*/
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
index cc224eb5f..6f6ace9c9 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
@@ -41,6 +41,10 @@ public interface FlowConfigsResourceHandler {
* Get all {@link FlowConfig}
*/
Collection<FlowConfig> getAllFlowConfigs();
+ /**
+ * Get all {@link FlowConfig} with pagination
+ */
+ Collection<FlowConfig> getAllFlowConfigs(int start, int count);
/**
* Add {@link FlowConfig}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 1aebb7566..a679d768c 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -106,7 +106,12 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
*/
@Override
public List<FlowConfig> getAll(@Context PagingContext pagingContext) {
- return (List) this.getFlowConfigResourceHandler().getAllFlowConfigs();
+ // Check to see if the count and start parameters are user defined or
default from the framework
+ if (!pagingContext.hasCount() && !pagingContext.hasStart())
+ return (List) this.getFlowConfigResourceHandler().getAllFlowConfigs();
+ else {
+ return (List)
this.getFlowConfigResourceHandler().getAllFlowConfigs(pagingContext.getStart(),
pagingContext.getCount());
+ }
}
/**
@@ -125,9 +130,21 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
@Optional @QueryParam("isRunImmediately") Boolean isRunImmediately,
@Optional @QueryParam("owningGroup") String owningGroup,
@Optional @QueryParam("propertyFilter") String propertyFilter) {
- FlowSpecSearchObject flowSpecSearchObject = new FlowSpecSearchObject(null,
flowGroup, flowName,
- templateUri, userToProxy, sourceIdentifier, destinationIdentifier,
schedule, null,
- isRunImmediately, owningGroup, propertyFilter);
+ FlowSpecSearchObject flowSpecSearchObject;
+ // Check to see if the count and start parameters are user defined or
default from the framework
+ // Start is the index of the first specStore configurations to return
+ // Count is the total number of specStore configurations to return
+ if (!context.hasCount() && !context.hasStart()){
+ flowSpecSearchObject = new FlowSpecSearchObject(null, flowGroup,
flowName,
+ templateUri, userToProxy, sourceIdentifier, destinationIdentifier,
schedule, null,
+ isRunImmediately, owningGroup, propertyFilter, -1, -1);
+ }
+ else {
+ flowSpecSearchObject = new FlowSpecSearchObject(null, flowGroup,
flowName,
+ templateUri, userToProxy, sourceIdentifier, destinationIdentifier,
schedule, null,
+ isRunImmediately, owningGroup, propertyFilter, context.getStart(),
context.getCount());
+ }
+
return (List)
this.getFlowConfigResourceHandler().getFlowConfig(flowSpecSearchObject);
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
index 644d5f4a9..dc5d6447e 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
@@ -57,6 +57,8 @@ public class FlowSpecSearchObject implements SpecSearchObject
{
private final Boolean isRunImmediately;
private final String owningGroup;
private final String propertyFilter;
+ private final int start;
+ private final int count;
public static FlowSpecSearchObject fromFlowId(FlowId flowId) {
return
FlowSpecSearchObject.builder().flowGroup(flowId.getFlowGroup()).flowName(flowId.getFlowName()).build();
@@ -67,6 +69,7 @@ public class FlowSpecSearchObject implements SpecSearchObject
{
public String augmentBaseGetStatement(String baseStatement)
throws IOException {
List<String> conditions = new ArrayList<>();
+ List<String> limitAndOffset = new ArrayList<>();
/*
* IMPORTANT: the order of `conditions` added must align with the order of
parameter binding later in `completePreparedStatement`!
@@ -116,6 +119,14 @@ public class FlowSpecSearchObject implements
SpecSearchObject {
conditions.add("owning_group = ?");
}
+ if (this.getCount() > 0) {
+ // Order by two fields to make a full order by
+ limitAndOffset.add(" ORDER BY modified_time DESC, spec_uri ASC LIMIT ?");
+ if (this.getStart() > 0) {
+ limitAndOffset.add(" OFFSET ?");
+ }
+ }
+
// If the propertyFilter is myKey=myValue, it looks for a config where key
is `myKey` and value contains string `myValue`.
// If the propertyFilter string does not have `=`, it considers the string
as a key and just looks for its existence.
// Multiple occurrences of `=` in propertyFilter are not supported and
ignored completely.
@@ -136,11 +147,10 @@ public class FlowSpecSearchObject implements
SpecSearchObject {
}
}
- if (conditions.size() == 0) {
+ if (conditions.size() == 0 && limitAndOffset.size() == 0) {
throw new IOException("At least one condition is required to query flow
configs.");
}
-
- return baseStatement + String.join(" AND ", conditions);
+ return baseStatement + String.join(" AND ", conditions) + String.join(" ",
limitAndOffset);
}
@Override
@@ -181,7 +191,7 @@ public class FlowSpecSearchObject implements
SpecSearchObject {
}
if (this.getSchedule() != null) {
- statement.setString(++i, this.getModifiedTimestamp());
+ statement.setString(++i, this.getSchedule());
}
if (this.getIsRunImmediately() != null) {
@@ -191,5 +201,12 @@ public class FlowSpecSearchObject implements
SpecSearchObject {
if (this.getOwningGroup() != null) {
statement.setString(++i, this.getOwningGroup());
}
+
+ if (this.getCount() > 0) {
+ statement.setInt(++i, this.getCount());
+ if (this.getStart() > 0) {
+ statement.setInt(++i, this.getStart());
+ }
+ }
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index 1f2be8476..dfd643cd4 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -165,6 +165,11 @@ public abstract class InstrumentedSpecStore implements
SpecStore {
return this.getURIsWithTagTimer.invokeMayThrowIO(() ->
getSpecURIsWithTagImpl(tag));
}
+ @Override
+ public Collection<Spec> getSpecs(int start, int count) throws IOException {
+ return this.getTimer.invokeMayThrowIO(() -> getSpecsImpl(start, count));
+ }
+
@Override
public int getSize() throws IOException {
return this.getSizeTimer.invokeMayThrowIO(() -> getSizeImpl());
@@ -179,6 +184,7 @@ public abstract class InstrumentedSpecStore implements
SpecStore {
public abstract Iterator<URI> getSpecURIsImpl() throws IOException;
public abstract Iterator<URI> getSpecURIsWithTagImpl(String tag) throws
IOException;
public abstract int getSizeImpl() throws IOException;
+ public abstract Collection<Spec> getSpecsImpl(int start, int count) throws
IOException;
/** child classes can implement this if they want to get specs using {@link
SpecSearchObject} */
public Collection<Spec> getSpecsImpl(SpecSearchObject specUri) throws
IOException {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
index 259d5c7d0..3642d5a02 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
@@ -116,6 +116,12 @@ public interface SpecStore {
*/
Collection<Spec> getSpecs() throws IOException;
+ /***
+ * Get all {@link Spec}s from the {@link SpecStore} with pagination input.
+ * @throws IOException Exception in retrieving {@link Spec}s.
+ */
+ Collection<Spec> getSpecs(int start, int count) throws IOException;
+
/**
* Return an iterator of Spec URIs(Spec identifiers)
*/
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index f459c9588..b269279eb 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -312,6 +312,23 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
}
}
+ /**
+ * A function to get all specs in the {@link SpecStore} between the provided
start index and (start + count - 1) index, inclusive.
+ * This enables pagination so getting SpecStore object will not timeout, and
can be tuned to how many results is desired at any one time.
+ * The {@link Spec} in {@link SpecStore} are sorted in descending order of
the modified_time while paginating.
+ *
+ * @param start The start index.
+ * @param count The total number of records to get.
+ * @return A collection of specs between start and start + count - 1,
inclusive.
+ */
+ public Collection<Spec> getAllSpecs(int start, int count) {
+ try {
+ return specStore.getSpecs(start, count);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot retrieve specs from Spec stores
between " + start + " and " + (start + count - 1), e);
+ }
+ }
+
/**
* A wrapper of getSpecs that handles {@link SpecNotFoundException} properly.
* This is the most common way to fetch {@link Spec}. For customized way to
deal with exception, one will
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index 921138fd0..6f3bbe024 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -357,6 +357,11 @@ public class FSSpecStore extends InstrumentedSpecStore {
return getSizeImpl(this.fsSpecStoreDirPath);
}
+ @Override
+ public Collection<Spec> getSpecsImpl(int start, int count) throws
UnsupportedOperationException {
+ throw new UnsupportedOperationException();
+ }
+
private int getSizeImpl(Path directory) throws IOException {
int specs = 0;
FileStatus[] fileStatuses = fs.listStatus(directory);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
index c328a2f20..df3d65912 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
@@ -271,6 +271,24 @@ public class MysqlBaseSpecStore extends
InstrumentedSpecStore {
});
}
+ @Override
+ public Collection<Spec> getSpecsImpl(int start, int count) throws
IOException {
+ List<String> limitAndOffset = new ArrayList<>();
+ if (count > 0) {
+ // Order by two fields to make a full order by
+ limitAndOffset.add(" ORDER BY modified_time DESC, spec_uri ASC LIMIT");
+ limitAndOffset.add(String.valueOf(count));
+ if (start > 0) {
+ limitAndOffset.add("OFFSET");
+ limitAndOffset.add(String.valueOf(start));
+ }
+ }
+ String finalizedStatement = this.sqlStatements.getAllStatement +
String.join(" ", limitAndOffset);
+ return withPreparedStatement(finalizedStatement, statement -> {
+ return retrieveSpecs(statement);
+ });
+ }
+
@Override
public Iterator<URI> getSpecURIsWithTagImpl(String tag) throws IOException {
return
withPreparedStatement(this.sqlStatements.getAllURIsWithTagStatement, statement
-> {
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index b7e9df53b..90659d333 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -87,6 +87,7 @@ public class MysqlSpecStoreTest {
.withConfig(ConfigBuilder.create()
.addPrimitive("key", "value")
.addPrimitive("key3", "value3")
+ .addPrimitive("filter.this.flow", true)
.addPrimitive("config.with.dot", "value4")
.addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
.addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
@@ -98,6 +99,7 @@ public class MysqlSpecStoreTest {
flowSpec2 = FlowSpec.builder(this.uri2)
.withConfig(ConfigBuilder.create().addPrimitive("converter",
"value1,value2,value3")
.addPrimitive("key3", "value3")
+ .addPrimitive("filter.this.flow", true)
.addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
.addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg2")
@@ -107,6 +109,7 @@ public class MysqlSpecStoreTest {
.build();
flowSpec3 = FlowSpec.builder(this.uri3)
.withConfig(ConfigBuilder.create().addPrimitive("key3", "value3")
+ .addPrimitive("filter.this.flow", true)
.addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
.addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg3")
@@ -240,6 +243,81 @@ public class MysqlSpecStoreTest {
Assert.assertEquals(result.size(), 2);
}
+ @Test (dependsOnMethods = "testGetSpec")
+ public void testGetFilterSpecPaginate() throws Exception {
+ /**
+ * Sorted order of the specStore configurations is flowSpec1, flowSpec2.
+ * flowSpec3 is not included as it is a 'corrupted' flowspec
+ * flowSpec4 is not included as it doesn't have the 'filter.this.flow'
property
+ * Start is the offset of the first configuration to return
+ * Count is the total number of configurations to return
+ * PropertyFilter is the property to filter by
+ */
+
+ // Start of 0 and count of 1 means start from index 0, and return one
configuration only
+ FlowSpecSearchObject flowSpecSearchObject =
FlowSpecSearchObject.builder().start(0).count(1).propertyFilter("filter.this.flow").build();
+ Collection<Spec> specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertFalse(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec4));
+
+ // Start of 1 and count of 1 means start from index 1, and return one
configuration only
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().start(1).count(1).propertyFilter("filter.this.flow").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertFalse(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec4));
+
+ /**
+ * Start of 0 and count of 5 means start from index 0, and return five
configuration only
+ * Total of 3 flowSpecs in the DB, but flowSpec4 doesn't have
'filter.this.flow' filter so only returns 2 flowSpecs
+ * flowSpec1 and flowSpec2 match all the criteria
+ */
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().start(0).count(5).propertyFilter("filter.this.flow").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 2);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec4));
+ }
+
+ @Test (dependsOnMethods = "testGetSpec")
+ public void testGetAllSpecPaginate() throws Exception {
+ /**
+ * Sorted order of the specStore configurations is flowSpec1, flowSpec2,
flowSpec4
+ */
+ // Return all flowSpecs from index 0 to 9. Total of 3 flowSpecs only so
return all 3 flowSpecs
+ Collection<Spec> specs = this.specStore.getSpecs(0,10);
+ Assert.assertEquals(specs.size(), 3);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertTrue(specs.contains(this.flowSpec4));
+
+ // Return all flowSpecs using the default get all specs function. Testing
default functionality of returning everything
+ specs = this.specStore.getSpecs();
+ Assert.assertEquals(specs.size(), 3);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertTrue(specs.contains(this.flowSpec4));
+
+ // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only
return first two.
+ specs = this.specStore.getSpecs(0,2);
+ Assert.assertEquals(specs.size(), 2);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec4));
+
+ // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only
return first two.
+ // Check that functionality for not including a start value is the same as
including start value of 0
+ specs = this.specStore.getSpecs(-1, 2);
+ Assert.assertEquals(specs.size(), 2);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec4));
+ }
+
@Test (expectedExceptions = {IOException.class})
public void testGetCorruptedSpec() throws Exception {
this.specStore.addSpec(this.flowSpec3);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index f1b6e0acf..44890a21d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -99,6 +99,11 @@ public class GobblinServiceFlowConfigResourceHandler
implements FlowConfigsResou
return this.localHandler.getAllFlowConfigs();
}
+ @Override
+ public Collection<FlowConfig> getAllFlowConfigs(int start, int count) {
+ return this.localHandler.getAllFlowConfigs(start, count);
+ }
+
/**
* Adding {@link FlowConfig} should check if current node is active (master).
* If current node is active, call {@link
FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)} directly.
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 13e607d19..baac020fe 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service;
import java.io.File;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -86,8 +87,18 @@ public class GobblinServiceManagerTest {
private static final String TEST_GROUP_NAME = "testGroup";
private static final String TEST_FLOW_NAME = "testFlow";
private static final String TEST_FLOW_NAME2 = "testFlow2";
+ private static final String TEST_FLOW_NAME3 = "testFlow3";
+ private static final String TEST_FLOW_NAME4 = "testFlow4";
+ private static final String TEST_FLOW_NAME5 = "testFlow5";
+ private static final String TEST_FLOW_NAME6 = "testFlow6";
+ private static final String TEST_FLOW_NAME7 = "testFlow7";
private static final FlowId TEST_FLOW_ID = new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
private static final FlowId TEST_FLOW_ID2 = new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME2);
+ private static final FlowId TEST_FLOW_ID3 = new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME3);
+ private static final FlowId TEST_FLOW_ID4 = new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME4);
+ private static final FlowId TEST_FLOW_ID5 = new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME5);
+ private static final FlowId TEST_FLOW_ID6 = new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME6);
+ private static final FlowId TEST_FLOW_ID7 = new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME7);
private static final FlowId UNCOMPILABLE_FLOW_ID = new
FlowId().setFlowGroup(TEST_GROUP_NAME)
.setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW);
@@ -142,6 +153,10 @@ public class GobblinServiceManagerTest {
serviceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY,
TOPOLOGY_SPEC_STORE_DIR);
serviceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY,
FLOW_SPEC_STORE_DIR);
+ serviceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_CLASS_KEY,
"org.apache.gobblin.runtime.spec_store.MysqlSpecStore");
+ serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
"flow_spec_store");
+ serviceCoreProperties.put(FlowCatalog.FLOWSPEC_SERDE_CLASS_KEY,
"org.apache.gobblin.runtime.spec_serde.GsonFlowSpecSerDe");
+
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY,
TEST_GOBBLIN_EXECUTOR_NAME);
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +
TEST_GOBBLIN_EXECUTOR_NAME + ".description",
"StandaloneTestExecutor");
@@ -469,7 +484,7 @@ null, null, null, null);
File testFlowFile = new File(GIT_CLONE_DIR +
"/gobblin-config/testGroup/testFlow.pull");
testFlowFile.getParentFile().mkdirs();
- Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value20\n",
testFlowFile, Charsets.UTF_8);
+
Files.write("{\"id\":{\"flowName\":\"testFlow\",\"flowGroup\":\"testGroup\"},\"param1\":\"value20\"}",
testFlowFile, Charsets.UTF_8);
Collection<Spec> specs =
this.gobblinServiceManager.getFlowCatalog().getSpecs();
Assert.assertEquals(specs.size(), 0);
@@ -544,4 +559,133 @@ null, null, null, null);
this.flowConfigClient = new
FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
this.gobblinServiceManager.getRestLiServerListeningURI().getPort()),
transportClientProperties);
}
+
+ @Test (dependsOnMethods = "testGitCreate")
+ public void testGetAllPaginated() throws Exception {
+ // Order of the flows by descending modified_time, and ascending flow.name
should be: testFlow, testFlow2, testFlow3, testFlow4
+ FlowConfig flowConfig1 = new FlowConfig().setId(TEST_FLOW_ID)
+ .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+ .setProperties(new StringMap(flowProperties));
+ this.flowConfigClient.createFlowConfig(flowConfig1);
+
+ FlowConfig flowConfig2 = new FlowConfig().setId(TEST_FLOW_ID2)
+ .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+ .setProperties(new StringMap(flowProperties));
+ this.flowConfigClient.createFlowConfig(flowConfig2);
+
+ FlowConfig flowConfig3 = new FlowConfig().setId(TEST_FLOW_ID3)
+ .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+ .setProperties(new StringMap(flowProperties));
+ this.flowConfigClient.createFlowConfig(flowConfig3);
+
+ FlowConfig flowConfig4 = new FlowConfig().setId(TEST_FLOW_ID4)
+ .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+ .setProperties(new StringMap(flowProperties));
+ this.flowConfigClient.createFlowConfig(flowConfig4);
+
+ // Check that there are a total of 4 flowConfigs by using the default
getAll call
+ Collection<FlowConfig> flowConfigs =
this.flowConfigClient.getAllFlowConfigs();
+ Assert.assertEquals(flowConfigs.size(), 4);
+
+ // Check that there are a total of 4 flowConfigs using new getAll call
+ flowConfigs = this.flowConfigClient.getAllFlowConfigs(0,20);
+ Assert.assertEquals(flowConfigs.size(), 4);
+
+ // Attempt pagination with one element from the start of the specStore
configurations stored
+ // Start at index 0 and return 1 element
+ flowConfigs = this.flowConfigClient.getAllFlowConfigs(0,1);
+ Assert.assertEquals(flowConfigs.size(), 1);
+
Assert.assertEquals(((FlowConfig)(flowConfigs.toArray()[0])).getId().getFlowName(),
"testFlow");
+
+ // Attempt pagination with one element from the specStore configurations
stored with offset of 1
+ // Start at index 1 and return 1 element
+ flowConfigs = this.flowConfigClient.getAllFlowConfigs(1,1);
+ Assert.assertEquals(flowConfigs.size(), 1);
+
Assert.assertEquals(((FlowConfig)(flowConfigs.toArray()[0])).getId().getFlowName(),
"testFlow2");
+
+ // Attempt pagination with one element from the specStore configurations
stored with offset of 2
+ // Start at index 2 and return 1 element
+ flowConfigs = this.flowConfigClient.getAllFlowConfigs(2,1);
+ Assert.assertEquals(flowConfigs.size(), 1);
+
Assert.assertEquals(((FlowConfig)(flowConfigs.toArray()[0])).getId().getFlowName(),
"testFlow3");
+
+ // Attempt pagination with one element from the specStore configurations
stored with offset of 3
+ // Start at index 2 and return 1 element
+ flowConfigs = this.flowConfigClient.getAllFlowConfigs(3,1);
+ Assert.assertEquals(flowConfigs.size(), 1);
+
Assert.assertEquals(((FlowConfig)(flowConfigs.toArray()[0])).getId().getFlowName(),
"testFlow4");
+
+ // Attempt pagination with 20 element from the specStore configurations
stored with offset of 1
+ // Start at index 1 and return 20 elements if there exists 20 elements.
+ // But only 4 total elements, return 3 elements since offset by 1
+ flowConfigs = this.flowConfigClient.getAllFlowConfigs(1,20);
+ Assert.assertEquals(flowConfigs.size(), 3);
+ List flowNameArray = new ArrayList();
+ List expectedResults = new ArrayList();
+ expectedResults.add("testFlow2");
+ expectedResults.add("testFlow3");
+ expectedResults.add("testFlow4");
+ for (FlowConfig fc : flowConfigs) {
+ flowNameArray.add(fc.getId().getFlowName());
+ }
+ Assert.assertEquals(flowNameArray, expectedResults);
+
+ // Clean up the flowConfigs added in for the pagination tests
+ this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID);
+ this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID2);
+ this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID3);
+ this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID4);
+ }
+
+ @Test (dependsOnMethods = "testGitCreate")
+ public void testGetFilteredFlowsPaginated() throws Exception {
+ // Attempt pagination with one element from the start of the specStore
configurations stored. Filter by the owningGroup of "Keep.this"
+ FlowConfig flowConfig2 = new
FlowConfig().setId(TEST_FLOW_ID5).setOwningGroup("Filter.this")
+ .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+ .setProperties(new StringMap(flowProperties));
+ this.flowConfigClient.createFlowConfig(flowConfig2);
+
+ FlowConfig flowConfig3 = new
FlowConfig().setId(TEST_FLOW_ID6).setOwningGroup("Keep.this")
+ .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+ .setProperties(new StringMap(flowProperties));
+ this.flowConfigClient.createFlowConfig(flowConfig3);
+
+ FlowConfig flowConfig4 = new
FlowConfig().setId(TEST_FLOW_ID7).setOwningGroup("Keep.this")
+ .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+ .setProperties(new StringMap(flowProperties));
+ this.flowConfigClient.createFlowConfig(flowConfig4);
+
+ // Start at index 0 and return 1 element
+ Collection<FlowConfig> flowConfigs =
this.flowConfigClient.getFlowConfigs(null, null, null, null, null, null,
+ TEST_SCHEDULE, null, "Keep.this", null, 0, 1);
+ Assert.assertEquals(flowConfigs.size(), 1);
+
Assert.assertEquals(((FlowConfig)(flowConfigs.toArray()[0])).getId().getFlowName(),
"testFlow6");
+
+ // Attempt pagination with one element from the start of the specStore
configurations stored. Filter by the owningGroup of "Keep.this"
+ // Start at index 1 and return 1 element
+ flowConfigs = this.flowConfigClient.getFlowConfigs(null, null, null, null,
null, null,
+ TEST_SCHEDULE, null, "Keep.this", null, 1, 1);
+ Assert.assertEquals(flowConfigs.size(), 1);
+
Assert.assertEquals(((FlowConfig)(flowConfigs.toArray()[0])).getId().getFlowName(),
"testFlow7");
+
+ // Attempt pagination with one element from the start of the specStore
configurations stored. Filter by the owningGroup of "Keep.this"
+ // Start at index 0 and return 20 element if exists. In this case, only 2
items so return all two items
+ flowConfigs = this.flowConfigClient.getFlowConfigs(null, null, null, null,
null, null,
+ TEST_SCHEDULE, null, "Keep.this", null, 0, 20);
+ Assert.assertEquals(flowConfigs.size(), 2);
+
+ List flowNameArray = new ArrayList();
+ List expectedResults = new ArrayList();
+ expectedResults.add("testFlow6");
+ expectedResults.add("testFlow7");
+ for (FlowConfig fc : flowConfigs) {
+ flowNameArray.add(fc.getId().getFlowName());
+ }
+ Assert.assertEquals(flowNameArray, expectedResults);
+
+ // Clean up the flowConfigs added in for the pagination tests
+ this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID5);
+ this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID6);
+ this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID7);
+ }
}
\ No newline at end of file
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index e988e6e25..3bda269ab 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -207,8 +207,8 @@ ext.externalDependency = [
"org.slf4j:slf4j-log4j12:" + slf4jVersion
],
"postgresConnector": "org.postgresql:postgresql:42.1.4",
- "testContainers": "org.testcontainers:testcontainers:1.15.3",
- "testContainersMysql": "org.testcontainers:mysql:1.15.3"
+ "testContainers": "org.testcontainers:testcontainers:1.17.3",
+ "testContainersMysql": "org.testcontainers:mysql:1.17.3"
]
if (!isDefaultEnvironment)