tpalfy commented on code in PR #6794:
URL: https://github.com/apache/nifi/pull/6794#discussion_r1069710817


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -106,13 +113,54 @@
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class QuerySalesforceObject extends AbstractProcessor {
 
+    static final AllowableValue QUERY_PARAMETERS = new 
AllowableValue("query-parameters", "Query Parameters", "Provide query by 
parameters.");

Review Comment:
   To me this name was confusing at first. I think for most of us "query 
parameter" implies the idea of using parameters in prepared statement or even 
in an http request.
   
   Also it's not really parameters but properties isn't it? We are using the 
properties of the processor to build the query.
   
   With that in mind I'd rename this to PROPERTY_BASED_QUERY or something 
similar.
   
   BTW both query methods look very vulnerable to injection. Are they not?



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -261,6 +323,30 @@ protected Collection<ValidationResult> 
customValidate(ValidationContext validati
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        boolean isCustomQuery = 
CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());

Review Comment:
   This onTrigger is getting a little too big.
   
   I would at least move the 2 types of request-result handling logic to their 
own separate methods.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -261,6 +323,30 @@ protected Collection<ValidationResult> 
customValidate(ValidationContext validati
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        boolean isCustomQuery = 
CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());
+        AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
+
+        if (isCustomQuery) {
+            String customQuery = 
context.getProperty(CUSTOM_SOQL_QUERY).getValue();
+            do {
+                FlowFile flowFile = session.create();
+                Map<String, String> attributes = new HashMap<>();
+                try (InputStream response = 
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
+                    flowFile = session.write(flowFile, 
parseHttpResponse(response, nextRecordsUrl));
+                    int recordCount = nextRecordsUrl.get() != null ? 2000 : 
Integer.parseInt(totalSize) % 2000;

Review Comment:
   Is there a guarantee that the number of records will always be 2000 in a 
batch when there are more than that much in total?
   
   In any case I'd make a constant out of it.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -194,10 +248,16 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
     private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
     private static final String DATE_TIME_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";
     private static final String NEXT_RECORDS_URL = "nextRecordsUrl";
+    private static final String TOTAL_SIZE = "totalSize";
+    private static final String RECORDS = "records";
     private static final BiPredicate<String, String> CAPTURE_PREDICATE = 
(fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName);
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+    private static final String TOTAL_RECORD_COUNT = "total.record.count";
 
     private volatile SalesforceToRecordSchemaConverter 
salesForceToRecordSchemaConverter;
     private volatile SalesforceRestService salesforceRestService;
+    private volatile String totalSize;

Review Comment:
   I don't think this is a good idea. Not only it is not thread safe (although 
not sure if it makes much sense to run this processor on multiple threads) but 
a running method to store intermediate state in fields is a bad design in 
general.
   
   The crux of the issue is that the new `parseHttpResponse` method needs to 
return 3 entirely different things. The output stream callback, this total size 
and also the next record url.
   If we really don't want to create a POJO for those 3, we could use an Atomic 
or other wrapper to have the total size in an in-out parameter (similar to how 
the next record url is handled already).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to