This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a61f019cbf NIFI-10966: Add option to QuerySalesforceObject to run 
custom query
a61f019cbf is described below

commit a61f019cbf4c44d2ceac59f3dfa46b6770dcfd63
Author: Lehel Boér <[email protected]>
AuthorDate: Thu Dec 8 23:49:36 2022 +0100

    NIFI-10966: Add option to QuerySalesforceObject to run custom query
    
    This closes #6794.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../salesforce/QuerySalesforceObject.java          | 210 +++++++++++++++++----
 .../additionalDetails.html                         |   7 +-
 .../salesforce/QuerySalesforceObjectIT.java        |  17 ++
 3 files changed, 197 insertions(+), 37 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
index 8a2a465c7a..169e3cdd8b 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
@@ -18,18 +18,23 @@ package org.apache.nifi.processors.salesforce;
 
 import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
 import org.apache.camel.component.salesforce.api.dto.SObjectField;
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
 import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
-import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -47,6 +52,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
 import 
org.apache.nifi.processors.salesforce.util.SalesforceToRecordSchemaConverter;
@@ -89,16 +95,17 @@ import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
 import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
 import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
 
-@PrimaryNodeOnly
 @TriggerSerially
-@TriggerWhenEmpty
-@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@InputRequirement(Requirement.INPUT_ALLOWED)
 @Tags({"salesforce", "sobject", "soql", "query"})
 @CapabilityDescription("Retrieves records from a Salesforce sObject. Users can 
add arbitrary filter conditions by setting the 'Custom WHERE Condition' 
property."
+        + " The processor can also run a custom query, although record 
processing is not supported in that case."
         + " Supports incremental retrieval: users can define a field in the 
'Age Field' property that will be used to determine when the record was 
created."
         + " When this property is set the processor will retrieve new records. 
It's also possible to define an initial cutoff value for the age, filtering out 
all older records"
-        + " even for the first run. This processor is intended to be run on 
the Primary Node only."
-        + " FlowFile attribute 'record.count' indicates how many records were 
retrieved and written to the output.")
+        + " even for the first run. In case of 'Property Based Query' this 
processor should run on the Primary Node only."
+        + " FlowFile attribute 'record.count' indicates how many records were 
retrieved and written to the output."
+        + " By using 'Custom Query', the processor can accept an optional 
input flowfile and reference the flowfile attributes in the query."
+        + " However, incremental loading and record-based processing are not 
supported in this scenario.")
 @Stateful(scopes = Scope.CLUSTER, description = "When 'Age Field' is set, 
after performing a query the time of execution is stored. Subsequent queries 
will be augmented"
         + " with an additional condition so that only records that are newer 
than the stored execution time (adjusted with the optional value of 'Age 
Delay') will be retrieved."
         + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
@@ -110,6 +117,28 @@ import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class QuerySalesforceObject extends AbstractProcessor {
 
+    static final AllowableValue PROPERTY_BASED_QUERY = new 
AllowableValue("property-based-query", "Property Based Query", "Provide query 
by properties.");
+    static final AllowableValue CUSTOM_QUERY = new 
AllowableValue("custom-query", "Custom Query", "Provide custom SOQL query.");
+
+    static final PropertyDescriptor QUERY_TYPE = new 
PropertyDescriptor.Builder()
+            .name("query-type")
+            .displayName("Query Type")
+            .description("Choose to provide the query by parameters or a full 
custom query.")
+            .required(true)
+            .defaultValue(PROPERTY_BASED_QUERY.getValue())
+            .allowableValues(PROPERTY_BASED_QUERY, CUSTOM_QUERY)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_SOQL_QUERY = new 
PropertyDescriptor.Builder()
+            .name("custom-soql-query")
+            .displayName("Custom SOQL Query")
+            .description("Specify the SOQL query to run.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(QUERY_TYPE, CUSTOM_QUERY)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
     static final PropertyDescriptor SOBJECT_NAME = new 
PropertyDescriptor.Builder()
             .name("sobject-name")
             .displayName("sObject Name")
@@ -117,6 +146,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             .required(true)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
             .build();
 
     static final PropertyDescriptor FIELD_NAMES = new 
PropertyDescriptor.Builder()
@@ -126,6 +156,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             .required(false)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
             .build();
 
     static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
@@ -133,7 +164,8 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             .displayName("Record Writer")
             .description("Service used for writing records returned from the 
Salesforce REST API")
             .identifiesControllerService(RecordSetWriterFactory.class)
-            .required(true)
+            .required(false)
+            .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
             .build();
 
     static final PropertyDescriptor CREATE_ZERO_RECORD_FILES = new 
PropertyDescriptor.Builder()
@@ -144,6 +176,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             .allowableValues("true", "false")
             .defaultValue("false")
             .required(true)
+            .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
             .build();
 
     static final PropertyDescriptor AGE_FIELD = new 
PropertyDescriptor.Builder()
@@ -155,6 +188,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             .required(false)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
             .build();
 
     static final PropertyDescriptor AGE_DELAY = new 
PropertyDescriptor.Builder()
@@ -166,6 +200,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .dependsOn(AGE_FIELD)
+            .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
             .build();
 
     static final PropertyDescriptor INITIAL_AGE_FILTER = new 
PropertyDescriptor.Builder()
@@ -176,6 +211,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .dependsOn(AGE_FIELD)
+            .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
             .build();
 
     static final PropertyDescriptor CUSTOM_WHERE_CONDITION = new 
PropertyDescriptor.Builder()
@@ -185,6 +221,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             .required(false)
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
             .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -192,19 +229,35 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             .description("For FlowFiles created as a result of a successful 
query.")
             .build();
 
+    static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("The input flowfile gets sent to this relationship 
when the query succeeds.")
+            .autoTerminateDefault(true)
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("The input flowfile gets sent to this relationship 
when the query fails.")
+            .build();
+
     private static final String LAST_AGE_FILTER = "last_age_filter";
     private static final String STARTING_FIELD_NAME = "records";
     private static final String DATE_FORMAT = "yyyy-MM-dd";
     private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
     private static final String DATE_TIME_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";
     private static final String NEXT_RECORDS_URL = "nextRecordsUrl";
+    private static final String TOTAL_SIZE = "totalSize";
+    private static final String RECORDS = "records";
     private static final BiPredicate<String, String> CAPTURE_PREDICATE = 
(fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName);
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+    private static final String TOTAL_RECORD_COUNT = "total.record.count";
 
     private volatile SalesforceToRecordSchemaConverter 
salesForceToRecordSchemaConverter;
     private volatile SalesforceRestService salesforceRestService;
 
     @OnScheduled
-    public void onScheduled(final ProcessContext context) {
+    public void onScheduled(ProcessContext context) {
         salesForceToRecordSchemaConverter = new 
SalesforceToRecordSchemaConverter(
                 DATE_FORMAT,
                 DATE_TIME_FORMAT,
@@ -223,34 +276,40 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
         );
     }
 
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            QUERY_TYPE,
+            CUSTOM_SOQL_QUERY,
+            SOBJECT_NAME,
+            FIELD_NAMES,
+            RECORD_WRITER,
+            AGE_FIELD,
+            INITIAL_AGE_FILTER,
+            AGE_DELAY,
+            CUSTOM_WHERE_CONDITION,
+            READ_TIMEOUT,
+            CREATE_ZERO_RECORD_FILES,
+            TOKEN_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS, REL_FAILURE, REL_ORIGINAL
+    )));
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return Collections.unmodifiableList(Arrays.asList(
-                API_URL,
-                API_VERSION,
-                SOBJECT_NAME,
-                FIELD_NAMES,
-                READ_TIMEOUT,
-                TOKEN_PROVIDER,
-                RECORD_WRITER,
-                CREATE_ZERO_RECORD_FILES,
-                AGE_FIELD,
-                INITIAL_AGE_FILTER,
-                AGE_DELAY,
-                CUSTOM_WHERE_CONDITION
-        ));
+        return PROPERTIES;
     }
 
     @Override
     public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        return relationships;
+        return RELATIONSHIPS;
     }
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
-        final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
+        List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
         if (validationContext.getProperty(INITIAL_AGE_FILTER).isSet() && 
!validationContext.getProperty(AGE_FIELD).isSet()) {
             results.add(
                     new ValidationResult.Builder()
@@ -264,7 +323,24 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
     }
 
     @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        boolean isCustomQuery = 
CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());
+
+
+        if (isCustomQuery) {
+            FlowFile flowFile = session.get();
+            if (flowFile == null && context.hasIncomingConnection()) {
+                context.yield();
+                return;
+            }
+            processCustomQuery(context, session, flowFile);
+            return;
+        }
+        processQuery(context, session);
+    }
+
+    private void processQuery(ProcessContext context, ProcessSession session) {
+        AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
         String sObject = context.getProperty(SOBJECT_NAME).getValue();
         String fields = context.getProperty(FIELD_NAMES).getValue();
         String customWhereClause = 
context.getProperty(CUSTOM_WHERE_CONDITION).getValue();
@@ -316,10 +392,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
                 ageFilterUpper
         );
 
-        AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
-
         do {
-
             FlowFile flowFile = session.create();
             Map<String, String> originalAttributes = flowFile.getAttributes();
             Map<String, String> attributes = new HashMap<>();
@@ -328,7 +401,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
 
             flowFile = session.write(flowFile, out -> {
                 try (
-                        InputStream querySObjectResultInputStream = 
getResultInputStream(nextRecordsUrl, querySObject);
+                        InputStream querySObjectResultInputStream = 
getResultInputStream(nextRecordsUrl.get(), querySObject);
 
                         JsonTreeRowRecordReader jsonReader = new 
JsonTreeRowRecordReader(
                                 querySObjectResultInputStream,
@@ -398,11 +471,78 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
         } while (nextRecordsUrl.get() != null);
     }
 
-    private InputStream getResultInputStream(AtomicReference<String> 
nextRecordsUrl, String querySObject) {
-        if (nextRecordsUrl.get() == null) {
+    private void processCustomQuery(ProcessContext context, ProcessSession 
session, FlowFile originalFlowFile) {
+        String customQuery = 
context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue();
+        AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
+        AtomicReference<String> totalSize = new AtomicReference<>();
+        boolean isOriginalTransferred = false;
+        List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+        do {
+            FlowFile outgoingFlowFile;
+            try (InputStream response = 
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
+                if (originalFlowFile != null) {
+                    outgoingFlowFile = session.create(originalFlowFile);
+                } else {
+                    outgoingFlowFile = session.create();
+                }
+                outgoingFlowFiles.add(outgoingFlowFile);
+                outgoingFlowFile = session.write(outgoingFlowFile, 
parseHttpResponse(response, nextRecordsUrl, totalSize));
+                int recordCount = nextRecordsUrl.get() != null ? 2000 : 
Integer.parseInt(totalSize.get()) % 2000;
+                Map<String, String> attributes = new HashMap<>();
+                attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/json");
+                attributes.put(TOTAL_RECORD_COUNT, 
String.valueOf(recordCount));
+                session.adjustCounter("Salesforce records processed", 
recordCount, false);
+                session.putAllAttributes(outgoingFlowFile, attributes);
+            } catch (IOException e) {
+                throw new ProcessException("Couldn't get Salesforce records", 
e);
+            } catch (Exception e) {
+                if (originalFlowFile != null) {
+                    session.transfer(originalFlowFile, REL_FAILURE);
+                    isOriginalTransferred = true;
+                }
+                getLogger().error("Couldn't get Salesforce records", e);
+                session.remove(outgoingFlowFiles);
+                outgoingFlowFiles.clear();
+                break;
+            }
+        } while (nextRecordsUrl.get() != null);
+
+        if (!outgoingFlowFiles.isEmpty()) {
+            session.transfer(outgoingFlowFiles, REL_SUCCESS);
+        }
+        if (originalFlowFile != null && !isOriginalTransferred) {
+            session.transfer(originalFlowFile, REL_ORIGINAL);
+        }
+    }
+
+    private OutputStreamCallback parseHttpResponse(InputStream in, 
AtomicReference<String> nextRecordsUrl, AtomicReference<String> totalSize) {
+        nextRecordsUrl.set(null);
+        return out -> {
+            try (JsonParser jsonParser = JSON_FACTORY.createParser(in);
+                 JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                while (jsonParser.nextToken() != null) {
+                    if (nextTokenIs(jsonParser, TOTAL_SIZE)) {
+                        totalSize.set(jsonParser.getValueAsString());
+                    } else if (nextTokenIs(jsonParser, NEXT_RECORDS_URL)) {
+                        nextRecordsUrl.set(jsonParser.getValueAsString());
+                    } else if (nextTokenIs(jsonParser, RECORDS)) {
+                        jsonGenerator.copyCurrentStructure(jsonParser);
+                    }
+                }
+            }
+        };
+    }
+
+    private boolean nextTokenIs(JsonParser jsonParser, String value) throws 
IOException {
+        return jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && 
jsonParser.getCurrentName()
+                .equals(value) && jsonParser.nextToken() != null;
+    }
+
+    private InputStream getResultInputStream(String nextRecordsUrl, String 
querySObject) {
+        if (nextRecordsUrl == null) {
             return salesforceRestService.query(querySObject);
         }
-        return salesforceRestService.getNextRecords(nextRecordsUrl.get());
+        return salesforceRestService.getNextRecords(nextRecordsUrl);
     }
 
     private SalesforceSchemaHolder getConvertedSalesforceSchema(String 
sObject, String fields) {
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/docs/org.apache.nifi.processors.salesforce.QuerySalesforceObject/additionalDetails.html
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/docs/org.apache.nifi.processors.salesforce.QuerySalesforceObject/additionalDetails.html
index 6d05afbf52..c35b44f4eb 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/docs/org.apache.nifi.processors.salesforce.QuerySalesforceObject/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/docs/org.apache.nifi.processors.salesforce.QuerySalesforceObject/additionalDetails.html
@@ -34,8 +34,11 @@ td {text-align: left}
 
 <p>
     Objects in Salesforce are database tables, their rows are known as 
records, and their columns are called fields. The QuerySalesforceObject 
processor queries Salesforce objects and retrieves their records.
-    The processor constructs the query using SOQL (Salesforce Object Query 
Language) and retrieves the result record dataset using the Salesforce REST API.
-    The processor utilizes streams and NiFi record-based processing to be able 
to handle a large number of records and to allow arbitrary output format.
+    The processor constructs the query from processor properties or executes a 
custom SOQL (Salesforce Object Query Language) query and retrieves the result 
record dataset using the Salesforce REST API.
+    The 'Query Type' processor property allows the query to be built in two 
ways. The 'Property Based Query' option allows to define a 'SELECT 
&lt;fields&gt; from &lt;Salesforce object&gt;' type query,
+    with the fields defined in the 'Field Names' property and the Salesforce 
object defined in the 'sObject Name' property, whereas the 'Custom Query' 
option allows you to supply an arbitrary SOQL query.
+    By using 'Custom Query', the processor can accept an optional input 
flowfile and reference the flowfile attributes in the query. However, 
incremental loading and record-based processing are only supported
+    in 'Property Based Queries'.
 </p>
 
 <h3>OAuth2 Access Token Provider Service</h3>
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
index f3f5bd50eb..b3abddebbf 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
@@ -63,6 +63,7 @@ class QuerySalesforceObjectIT implements 
SalesforceConfigAware {
         runner.addControllerService("writer", writer);
         runner.enableControllerService(writer);
 
+        runner.setProperty(QuerySalesforceObject.QUERY_TYPE, 
QuerySalesforceObject.PROPERTY_BASED_QUERY);
         runner.setProperty(QuerySalesforceObject.SOBJECT_NAME, sObjectName);
         runner.setProperty(QuerySalesforceObject.FIELD_NAMES, fieldNames);
         runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
@@ -77,4 +78,20 @@ class QuerySalesforceObjectIT implements 
SalesforceConfigAware {
 
         assertNotNull(results.get(0).getContent());
     }
+
+    @Test
+    void runCustomQuery() {
+        String customQuery = "SELECT Id, Name, AccountId, 
Account.ShippingAddress FROM Contact";
+
+        runner.setProperty(QuerySalesforceObject.QUERY_TYPE, 
QuerySalesforceObject.CUSTOM_QUERY);
+        runner.setProperty(QuerySalesforceObject.CUSTOM_SOQL_QUERY, 
customQuery);
+        runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
+        runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
+
+        runner.run();
+
+        List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS);
+
+        assertNotNull(results.get(0).getContent());
+    }
 }

Reply via email to