tpalfy commented on code in PR #6794:
URL: https://github.com/apache/nifi/pull/6794#discussion_r1069710817
##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -106,13 +113,54 @@
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class QuerySalesforceObject extends AbstractProcessor {
+ static final AllowableValue QUERY_PARAMETERS = new
AllowableValue("query-parameters", "Query Parameters", "Provide query by
parameters.");
Review Comment:
To me this name was confusing at first. I think for most of us "query
parameter" implies the idea of using parameters in prepared statement or even
in an http request.
Also it's not really parameters but properties isn't it? We are using the
properties of the processor to build the query.
With that in mind I'd rename this to PROPERTY_BASED_QUERY or something
similar.
BTW both query methods look very vulnerable to injection. Are they not?
##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -261,6 +323,30 @@ protected Collection<ValidationResult>
customValidate(ValidationContext validati
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ boolean isCustomQuery =
CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());
Review Comment:
This onTrigger is getting a little too big.
I would at least move the 2 types of request-result handling logic to their
own separate methods.
##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -261,6 +323,30 @@ protected Collection<ValidationResult>
customValidate(ValidationContext validati
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ boolean isCustomQuery =
CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());
+ AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
+
+ if (isCustomQuery) {
+ String customQuery =
context.getProperty(CUSTOM_SOQL_QUERY).getValue();
+ do {
+ FlowFile flowFile = session.create();
+ Map<String, String> attributes = new HashMap<>();
+ try (InputStream response =
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
+ flowFile = session.write(flowFile,
parseHttpResponse(response, nextRecordsUrl));
+ int recordCount = nextRecordsUrl.get() != null ? 2000 :
Integer.parseInt(totalSize) % 2000;
Review Comment:
Is there a guarantee that the number of records will always be 2000 in a
batch when there are more than that much in total?
In any case I'd make a constant out of it.
##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -194,10 +248,16 @@ public class QuerySalesforceObject extends
AbstractProcessor {
private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
private static final String DATE_TIME_FORMAT =
"yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";
private static final String NEXT_RECORDS_URL = "nextRecordsUrl";
+ private static final String TOTAL_SIZE = "totalSize";
+ private static final String RECORDS = "records";
private static final BiPredicate<String, String> CAPTURE_PREDICATE =
(fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+ private static final String TOTAL_RECORD_COUNT = "total.record.count";
private volatile SalesforceToRecordSchemaConverter
salesForceToRecordSchemaConverter;
private volatile SalesforceRestService salesforceRestService;
+ private volatile String totalSize;
Review Comment:
I don't think this is a good idea. Not only it is not thread safe (although
not sure if it makes much sense to run this processor on multiple threads) but
a running method to store intermediate state in fields is a bad design in
general.
The crux of the issue is that the new `parseHttpResponse` method needs to
return 3 entirely different things. The output stream callback, this total size
and also the next record url.
If we really don't want to create a POJO for those 3, we could use an Atomic
or other wrapper to have the total size in an in-out parameter (similar to how
the next record url is handled already).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]