This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a61f019cbf NIFI-10966: Add option to QuerySalesforceObject to run
custom query
a61f019cbf is described below
commit a61f019cbf4c44d2ceac59f3dfa46b6770dcfd63
Author: Lehel Boér <[email protected]>
AuthorDate: Thu Dec 8 23:49:36 2022 +0100
NIFI-10966: Add option to QuerySalesforceObject to run custom query
This closes #6794.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../salesforce/QuerySalesforceObject.java | 210 +++++++++++++++++----
.../additionalDetails.html | 7 +-
.../salesforce/QuerySalesforceObjectIT.java | 17 ++
3 files changed, 197 insertions(+), 37 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
index 8a2a465c7a..169e3cdd8b 100644
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
@@ -18,18 +18,23 @@ package org.apache.nifi.processors.salesforce;
import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
import org.apache.camel.component.salesforce.api.dto.SObjectField;
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
-import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -47,6 +52,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
import
org.apache.nifi.processors.salesforce.util.SalesforceToRecordSchemaConverter;
@@ -89,16 +95,17 @@ import static
org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
import static
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
import static
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
-@PrimaryNodeOnly
@TriggerSerially
-@TriggerWhenEmpty
-@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"salesforce", "sobject", "soql", "query"})
@CapabilityDescription("Retrieves records from a Salesforce sObject. Users can
add arbitrary filter conditions by setting the 'Custom WHERE Condition'
property."
+ + " The processor can also run a custom query, although record
processing is not supported in that case."
+ " Supports incremental retrieval: users can define a field in the
'Age Field' property that will be used to determine when the record was
created."
+ " When this property is set the processor will retrieve new records.
It's also possible to define an initial cutoff value for the age, filtering out
all older records"
- + " even for the first run. This processor is intended to be run on
the Primary Node only."
- + " FlowFile attribute 'record.count' indicates how many records were
retrieved and written to the output.")
+ + " even for the first run. In case of 'Property Based Query' this
processor should run on the Primary Node only."
+ + " FlowFile attribute 'record.count' indicates how many records were
retrieved and written to the output."
+ + " By using 'Custom Query', the processor can accept an optional
input flowfile and reference the flowfile attributes in the query."
+ + " However, incremental loading and record-based processing are not
supported in this scenario.")
@Stateful(scopes = Scope.CLUSTER, description = "When 'Age Field' is set,
after performing a query the time of execution is stored. Subsequent queries
will be augmented"
+ " with an additional condition so that only records that are newer
than the stored execution time (adjusted with the optional value of 'Age
Delay') will be retrieved."
+ " State is stored across the cluster so that this Processor can be
run on Primary Node only and if a new Primary Node is selected,"
@@ -110,6 +117,28 @@ import static
org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class QuerySalesforceObject extends AbstractProcessor {
+ static final AllowableValue PROPERTY_BASED_QUERY = new
AllowableValue("property-based-query", "Property Based Query", "Provide query
by properties.");
+ static final AllowableValue CUSTOM_QUERY = new
AllowableValue("custom-query", "Custom Query", "Provide custom SOQL query.");
+
+ static final PropertyDescriptor QUERY_TYPE = new
PropertyDescriptor.Builder()
+ .name("query-type")
+ .displayName("Query Type")
+ .description("Choose to provide the query by parameters or a full
custom query.")
+ .required(true)
+ .defaultValue(PROPERTY_BASED_QUERY.getValue())
+ .allowableValues(PROPERTY_BASED_QUERY, CUSTOM_QUERY)
+ .build();
+
+ static final PropertyDescriptor CUSTOM_SOQL_QUERY = new
PropertyDescriptor.Builder()
+ .name("custom-soql-query")
+ .displayName("Custom SOQL Query")
+ .description("Specify the SOQL query to run.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(QUERY_TYPE, CUSTOM_QUERY)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
static final PropertyDescriptor SOBJECT_NAME = new
PropertyDescriptor.Builder()
.name("sobject-name")
.displayName("sObject Name")
@@ -117,6 +146,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
.build();
static final PropertyDescriptor FIELD_NAMES = new
PropertyDescriptor.Builder()
@@ -126,6 +156,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
.build();
static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
@@ -133,7 +164,8 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.displayName("Record Writer")
.description("Service used for writing records returned from the
Salesforce REST API")
.identifiesControllerService(RecordSetWriterFactory.class)
- .required(true)
+ .required(false)
+ .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
.build();
static final PropertyDescriptor CREATE_ZERO_RECORD_FILES = new
PropertyDescriptor.Builder()
@@ -144,6 +176,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
+ .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
.build();
static final PropertyDescriptor AGE_FIELD = new
PropertyDescriptor.Builder()
@@ -155,6 +188,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
.build();
static final PropertyDescriptor AGE_DELAY = new
PropertyDescriptor.Builder()
@@ -166,6 +200,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.dependsOn(AGE_FIELD)
+ .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
.build();
static final PropertyDescriptor INITIAL_AGE_FILTER = new
PropertyDescriptor.Builder()
@@ -176,6 +211,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.dependsOn(AGE_FIELD)
+ .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
.build();
static final PropertyDescriptor CUSTOM_WHERE_CONDITION = new
PropertyDescriptor.Builder()
@@ -185,6 +221,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -192,19 +229,35 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.description("For FlowFiles created as a result of a successful
query.")
.build();
+ static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The input flowfile gets sent to this relationship
when the query succeeds.")
+ .autoTerminateDefault(true)
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("The input flowfile gets sent to this relationship
when the query fails.")
+ .build();
+
private static final String LAST_AGE_FILTER = "last_age_filter";
private static final String STARTING_FIELD_NAME = "records";
private static final String DATE_FORMAT = "yyyy-MM-dd";
private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
private static final String DATE_TIME_FORMAT =
"yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";
private static final String NEXT_RECORDS_URL = "nextRecordsUrl";
+ private static final String TOTAL_SIZE = "totalSize";
+ private static final String RECORDS = "records";
private static final BiPredicate<String, String> CAPTURE_PREDICATE =
(fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+ private static final String TOTAL_RECORD_COUNT = "total.record.count";
private volatile SalesforceToRecordSchemaConverter
salesForceToRecordSchemaConverter;
private volatile SalesforceRestService salesforceRestService;
@OnScheduled
- public void onScheduled(final ProcessContext context) {
+ public void onScheduled(ProcessContext context) {
salesForceToRecordSchemaConverter = new
SalesforceToRecordSchemaConverter(
DATE_FORMAT,
DATE_TIME_FORMAT,
@@ -223,34 +276,40 @@ public class QuerySalesforceObject extends
AbstractProcessor {
);
}
+ private static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
+ API_URL,
+ API_VERSION,
+ QUERY_TYPE,
+ CUSTOM_SOQL_QUERY,
+ SOBJECT_NAME,
+ FIELD_NAMES,
+ RECORD_WRITER,
+ AGE_FIELD,
+ INITIAL_AGE_FILTER,
+ AGE_DELAY,
+ CUSTOM_WHERE_CONDITION,
+ READ_TIMEOUT,
+ CREATE_ZERO_RECORD_FILES,
+ TOKEN_PROVIDER
+ ));
+
+ private static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_SUCCESS, REL_FAILURE, REL_ORIGINAL
+ )));
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return Collections.unmodifiableList(Arrays.asList(
- API_URL,
- API_VERSION,
- SOBJECT_NAME,
- FIELD_NAMES,
- READ_TIMEOUT,
- TOKEN_PROVIDER,
- RECORD_WRITER,
- CREATE_ZERO_RECORD_FILES,
- AGE_FIELD,
- INITIAL_AGE_FILTER,
- AGE_DELAY,
- CUSTOM_WHERE_CONDITION
- ));
+ return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- return relationships;
+ return RELATIONSHIPS;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
- final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+ List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
if (validationContext.getProperty(INITIAL_AGE_FILTER).isSet() &&
!validationContext.getProperty(AGE_FIELD).isSet()) {
results.add(
new ValidationResult.Builder()
@@ -264,7 +323,24 @@ public class QuerySalesforceObject extends
AbstractProcessor {
}
@Override
- public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ boolean isCustomQuery =
CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());
+
+
+ if (isCustomQuery) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null && context.hasIncomingConnection()) {
+ context.yield();
+ return;
+ }
+ processCustomQuery(context, session, flowFile);
+ return;
+ }
+ processQuery(context, session);
+ }
+
+ private void processQuery(ProcessContext context, ProcessSession session) {
+ AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
String sObject = context.getProperty(SOBJECT_NAME).getValue();
String fields = context.getProperty(FIELD_NAMES).getValue();
String customWhereClause =
context.getProperty(CUSTOM_WHERE_CONDITION).getValue();
@@ -316,10 +392,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
ageFilterUpper
);
- AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
-
do {
-
FlowFile flowFile = session.create();
Map<String, String> originalAttributes = flowFile.getAttributes();
Map<String, String> attributes = new HashMap<>();
@@ -328,7 +401,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
flowFile = session.write(flowFile, out -> {
try (
- InputStream querySObjectResultInputStream =
getResultInputStream(nextRecordsUrl, querySObject);
+ InputStream querySObjectResultInputStream =
getResultInputStream(nextRecordsUrl.get(), querySObject);
JsonTreeRowRecordReader jsonReader = new
JsonTreeRowRecordReader(
querySObjectResultInputStream,
@@ -398,11 +471,78 @@ public class QuerySalesforceObject extends
AbstractProcessor {
} while (nextRecordsUrl.get() != null);
}
- private InputStream getResultInputStream(AtomicReference<String>
nextRecordsUrl, String querySObject) {
- if (nextRecordsUrl.get() == null) {
+ private void processCustomQuery(ProcessContext context, ProcessSession
session, FlowFile originalFlowFile) {
+ String customQuery =
context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue();
+ AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
+ AtomicReference<String> totalSize = new AtomicReference<>();
+ boolean isOriginalTransferred = false;
+ List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+ do {
+ FlowFile outgoingFlowFile;
+ try (InputStream response =
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
+ if (originalFlowFile != null) {
+ outgoingFlowFile = session.create(originalFlowFile);
+ } else {
+ outgoingFlowFile = session.create();
+ }
+ outgoingFlowFiles.add(outgoingFlowFile);
+ outgoingFlowFile = session.write(outgoingFlowFile,
parseHttpResponse(response, nextRecordsUrl, totalSize));
+ int recordCount = nextRecordsUrl.get() != null ? 2000 :
Integer.parseInt(totalSize.get()) % 2000;
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
"application/json");
+ attributes.put(TOTAL_RECORD_COUNT,
String.valueOf(recordCount));
+ session.adjustCounter("Salesforce records processed",
recordCount, false);
+ session.putAllAttributes(outgoingFlowFile, attributes);
+ } catch (IOException e) {
+ throw new ProcessException("Couldn't get Salesforce records",
e);
+ } catch (Exception e) {
+ if (originalFlowFile != null) {
+ session.transfer(originalFlowFile, REL_FAILURE);
+ isOriginalTransferred = true;
+ }
+ getLogger().error("Couldn't get Salesforce records", e);
+ session.remove(outgoingFlowFiles);
+ outgoingFlowFiles.clear();
+ break;
+ }
+ } while (nextRecordsUrl.get() != null);
+
+ if (!outgoingFlowFiles.isEmpty()) {
+ session.transfer(outgoingFlowFiles, REL_SUCCESS);
+ }
+ if (originalFlowFile != null && !isOriginalTransferred) {
+ session.transfer(originalFlowFile, REL_ORIGINAL);
+ }
+ }
+
+ private OutputStreamCallback parseHttpResponse(InputStream in,
AtomicReference<String> nextRecordsUrl, AtomicReference<String> totalSize) {
+ nextRecordsUrl.set(null);
+ return out -> {
+ try (JsonParser jsonParser = JSON_FACTORY.createParser(in);
+ JsonGenerator jsonGenerator =
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+ while (jsonParser.nextToken() != null) {
+ if (nextTokenIs(jsonParser, TOTAL_SIZE)) {
+ totalSize.set(jsonParser.getValueAsString());
+ } else if (nextTokenIs(jsonParser, NEXT_RECORDS_URL)) {
+ nextRecordsUrl.set(jsonParser.getValueAsString());
+ } else if (nextTokenIs(jsonParser, RECORDS)) {
+ jsonGenerator.copyCurrentStructure(jsonParser);
+ }
+ }
+ }
+ };
+ }
+
+ private boolean nextTokenIs(JsonParser jsonParser, String value) throws
IOException {
+ return jsonParser.getCurrentToken() == JsonToken.FIELD_NAME &&
jsonParser.getCurrentName()
+ .equals(value) && jsonParser.nextToken() != null;
+ }
+
+ private InputStream getResultInputStream(String nextRecordsUrl, String
querySObject) {
+ if (nextRecordsUrl == null) {
return salesforceRestService.query(querySObject);
}
- return salesforceRestService.getNextRecords(nextRecordsUrl.get());
+ return salesforceRestService.getNextRecords(nextRecordsUrl);
}
private SalesforceSchemaHolder getConvertedSalesforceSchema(String
sObject, String fields) {
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/docs/org.apache.nifi.processors.salesforce.QuerySalesforceObject/additionalDetails.html
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/docs/org.apache.nifi.processors.salesforce.QuerySalesforceObject/additionalDetails.html
index 6d05afbf52..c35b44f4eb 100644
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/docs/org.apache.nifi.processors.salesforce.QuerySalesforceObject/additionalDetails.html
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/docs/org.apache.nifi.processors.salesforce.QuerySalesforceObject/additionalDetails.html
@@ -34,8 +34,11 @@ td {text-align: left}
<p>
Objects in Salesforce are database tables, their rows are known as
records, and their columns are called fields. The QuerySalesforceObject
processor queries Salesforce objects and retrieves their records.
- The processor constructs the query using SOQL (Salesforce Object Query
Language) and retrieves the result record dataset using the Salesforce REST API.
- The processor utilizes streams and NiFi record-based processing to be able
to handle a large number of records and to allow arbitrary output format.
+ The processor constructs the query from processor properties or executes a
custom SOQL (Salesforce Object Query Language) query and retrieves the result
record dataset using the Salesforce REST API.
+ The 'Query Type' processor property allows the query to be built in two
ways. The 'Property Based Query' option allows to define a 'SELECT
<fields> from <Salesforce object>' type query,
+ with the fields defined in the 'Field Names' property and the Salesforce
object defined in the 'sObject Name' property, whereas the 'Custom Query'
option allows you to supply an arbitrary SOQL query.
+ By using 'Custom Query', the processor can accept an optional input
flowfile and reference the flowfile attributes in the query. However,
incremental loading and record-based processing are only supported
+ in 'Property Based Queries'.
</p>
<h3>OAuth2 Access Token Provider Service</h3>
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
index f3f5bd50eb..b3abddebbf 100644
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
@@ -63,6 +63,7 @@ class QuerySalesforceObjectIT implements
SalesforceConfigAware {
runner.addControllerService("writer", writer);
runner.enableControllerService(writer);
+ runner.setProperty(QuerySalesforceObject.QUERY_TYPE,
QuerySalesforceObject.PROPERTY_BASED_QUERY);
runner.setProperty(QuerySalesforceObject.SOBJECT_NAME, sObjectName);
runner.setProperty(QuerySalesforceObject.FIELD_NAMES, fieldNames);
runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
@@ -77,4 +78,20 @@ class QuerySalesforceObjectIT implements
SalesforceConfigAware {
assertNotNull(results.get(0).getContent());
}
+
+ @Test
+ void runCustomQuery() {
+ String customQuery = "SELECT Id, Name, AccountId,
Account.ShippingAddress FROM Contact";
+
+ runner.setProperty(QuerySalesforceObject.QUERY_TYPE,
QuerySalesforceObject.CUSTOM_QUERY);
+ runner.setProperty(QuerySalesforceObject.CUSTOM_SOQL_QUERY,
customQuery);
+ runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
+ runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
+
+ runner.run();
+
+ List<MockFlowFile> results =
runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS);
+
+ assertNotNull(results.get(0).getContent());
+ }
}