krisztina-zsihovszki commented on code in PR #7019:
URL: https://github.com/apache/nifi/pull/7019#discussion_r1129775136


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -385,144 +354,181 @@ private void processQuery(ProcessContext context, 
ProcessSession session) {
                     .collect(Collectors.joining(","));
         }
 
-        String querySObject = buildQuery(
-                sObject,
-                fields,
-                customWhereClause,
-                ageField,
-                initialAgeFilter,
-                ageFilterLower,
-                ageFilterUpper
-        );
+        String querySObject = new SalesforceQueryBuilder(incrementalContext)
+                .buildQuery(sObject, fields, customWhereClause);
+
+        AtomicBoolean isOriginalTransferred = new AtomicBoolean(false);
+        List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+        Map<String, String> originalAttributes = 
Optional.ofNullable(originalFlowFile)
+                .map(FlowFile::getAttributes)
+                .orElseGet(HashMap::new);
+
+        long startNanos = System.nanoTime();
 
         do {
-            FlowFile flowFile = session.create();
-            Map<String, String> originalAttributes = flowFile.getAttributes();
-            Map<String, String> attributes = new HashMap<>();
+            FlowFile outgoingFlowFile = createOutgoingFlowFile(session, 
originalFlowFile);
+            outgoingFlowFiles.add(outgoingFlowFile);
+            Map<String, String> attributes = new HashMap<>(originalAttributes);
 
             AtomicInteger recordCountHolder = new AtomicInteger();
-            long startNanos = System.nanoTime();
-            flowFile = session.write(flowFile, out -> {
-                try (
-                        InputStream querySObjectResultInputStream = 
getResultInputStream(nextRecordsUrl.get(), querySObject);
-
-                        JsonTreeRowRecordReader jsonReader = new 
JsonTreeRowRecordReader(
-                                querySObjectResultInputStream,
-                                getLogger(),
-                                salesForceSchemaHolder.recordSchema,
-                                DATE_FORMAT,
-                                TIME_FORMAT,
-                                DATE_TIME_FORMAT,
-                                StartingFieldStrategy.NESTED_FIELD,
-                                STARTING_FIELD_NAME,
-                                SchemaApplicationStrategy.SELECTED_PART,
-                                CAPTURE_PREDICATE
-                        );
-
-                        RecordSetWriter writer = writerFactory.createWriter(
-                                getLogger(),
-                                writerFactory.getSchema(
-                                        originalAttributes,
-                                        salesForceSchemaHolder.recordSchema
-                                ),
-                                out,
-                                originalAttributes
-                        )
-                ) {
-                    writer.beginRecordSet();
-
-                    Record querySObjectRecord;
-                    while ((querySObjectRecord = jsonReader.nextRecord()) != 
null) {
-                        writer.write(querySObjectRecord);
-                    }
-
-                    WriteResult writeResult = writer.finishRecordSet();
+            try {
+                outgoingFlowFile = session.write(outgoingFlowFile, 
processRecordsCallback(context, nextRecordsUrl, writerFactory, state, 
incrementalContext,
+                        salesForceSchemaHolder, querySObject, 
originalAttributes, attributes, recordCountHolder));
+                int recordCount = recordCountHolder.get();
 
-                    Map<String, String> capturedFields = 
jsonReader.getCapturedFields();
+                if (createZeroRecordFlowFiles || recordCount != 0) {
+                    outgoingFlowFile = 
session.putAllAttributes(outgoingFlowFile, attributes);
 
-                    
nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, null));
+                    session.adjustCounter("Records Processed", recordCount, 
false);
+                    getLogger().info("Successfully written {} records for {}", 
recordCount, outgoingFlowFile);
+                } else {
+                    outgoingFlowFiles.remove(outgoingFlowFile);
+                    session.remove(outgoingFlowFile);
+                }
+            } catch (Exception e) {
+                if (e.getCause() instanceof IOException) {
+                    throw new ProcessException("Couldn't get Salesforce 
records", e);
+                } else if (e.getCause() instanceof SchemaNotFoundException) {
+                    handleError(session, originalFlowFile, 
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't create record writer");
+                } else if (e.getCause() instanceof MalformedRecordException) {
+                    handleError(session, originalFlowFile, 
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't read records from 
input");
+                } else {
+                    handleError(session, originalFlowFile, 
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't get Salesforce records");
+                }
+                break;
+            }
+        } while (nextRecordsUrl.get() != null);
 
-                    attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
-                    attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
-                    attributes.putAll(writeResult.getAttributes());
+        transferFlowFiles(session, outgoingFlowFiles, originalFlowFile, 
isOriginalTransferred, startNanos, sObject);
+    }
 
-                    recordCountHolder.set(writeResult.getRecordCount());
+    private OutputStreamCallback processRecordsCallback(ProcessContext 
context, AtomicReference<String> nextRecordsUrl, RecordSetWriterFactory 
writerFactory,
+                                                        StateMap state, 
IncrementalContext incrementalContext, SalesforceSchemaHolder 
salesForceSchemaHolder,
+                                                        String querySObject, 
Map<String, String> originalAttributes, Map<String, String> attributes,
+                                                        AtomicInteger 
recordCountHolder) {
+        return out -> {
+            try {
+                handleRecordSet(out, nextRecordsUrl, querySObject, 
writerFactory, salesForceSchemaHolder, originalAttributes, attributes, 
recordCountHolder);
 
-                    if (ageFilterUpper != null) {
-                        Map<String, String> newState = new 
HashMap<>(state.toMap());
-                        newState.put(LAST_AGE_FILTER, ageFilterUpper);
-                        updateState(context, newState);
-                    }
-                } catch (SchemaNotFoundException e) {
-                    throw new ProcessException("Couldn't create record 
writer", e);
-                } catch (MalformedRecordException e) {
-                    throw new ProcessException("Couldn't read records from 
input", e);
+                if (incrementalContext.getAgeFilterUpper() != null) {
+                    Map<String, String> newState = new 
HashMap<>(state.toMap());
+                    newState.put(LAST_AGE_FILTER, 
incrementalContext.getAgeFilterUpper());
+                    updateState(context, newState);
                 }
-            });
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
 
-            int recordCount = recordCountHolder.get();
+    private void handleRecordSet(OutputStream out, AtomicReference<String> 
nextRecordsUrl, String querySObject, RecordSetWriterFactory writerFactory,
+                                 SalesforceSchemaHolder 
salesForceSchemaHolder, Map<String, String> originalAttributes, Map<String, 
String> attributes,
+                                 AtomicInteger recordCountHolder) throws 
Exception {
+        try (
+                InputStream querySObjectResultInputStream = 
getResultInputStream(nextRecordsUrl.get(), querySObject);
+                JsonTreeRowRecordReader jsonReader = 
createJsonReader(querySObjectResultInputStream, 
salesForceSchemaHolder.getRecordSchema());
+                RecordSetWriter writer = createRecordSetWriter(writerFactory, 
originalAttributes, out, salesForceSchemaHolder.getRecordSchema())
+        ) {
+            writer.beginRecordSet();
+
+            Record querySObjectRecord;
+            while ((querySObjectRecord = jsonReader.nextRecord()) != null) {
+                writer.write(querySObjectRecord);
+            }
 
-            if (!createZeroRecordFlowFiles && recordCount == 0) {
-                session.remove(flowFile);
-            } else {
-                flowFile = session.putAllAttributes(flowFile, attributes);
-                session.transfer(flowFile, REL_SUCCESS);
+            WriteResult writeResult = writer.finishRecordSet();
 
-                long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                session.getProvenanceReporter().receive(flowFile, 
salesforceRestService.getVersionedBaseUrl() + "/composite/tree/" + sObject,
-                        transferMillis);
+            Map<String, String> capturedFields = 
jsonReader.getCapturedFields();
+            nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, 
null));
 
-                session.adjustCounter("Records Processed", recordCount, false);
-                getLogger().info("Successfully written {} records for {}", 
recordCount, flowFile);
-            }
-        } while (nextRecordsUrl.get() != null);
+            attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+            attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+            attributes.putAll(writeResult.getAttributes());
+            recordCountHolder.set(writeResult.getRecordCount());
+        }
+    }
+
+    private JsonTreeRowRecordReader createJsonReader(InputStream 
querySObjectResultInputStream, RecordSchema recordSchema) throws IOException, 
MalformedRecordException {
+        return new JsonTreeRowRecordReader(
+                querySObjectResultInputStream,
+                getLogger(),
+                recordSchema,
+                DATE_FORMAT,
+                TIME_FORMAT,
+                DATE_TIME_FORMAT,
+                StartingFieldStrategy.NESTED_FIELD,
+                STARTING_FIELD_NAME,
+                SchemaApplicationStrategy.SELECTED_PART,
+                CAPTURE_PREDICATE
+        );
+    }
+
+    private RecordSetWriter createRecordSetWriter(RecordSetWriterFactory 
writerFactory, Map<String, String> originalAttributes, OutputStream out,
+                                                  RecordSchema recordSchema) 
throws IOException, SchemaNotFoundException {
+        return writerFactory.createWriter(
+                getLogger(),
+                writerFactory.getSchema(
+                        originalAttributes,
+                        recordSchema
+                ),
+                out,
+                originalAttributes
+        );
     }
 
     private void processCustomQuery(ProcessContext context, ProcessSession 
session, FlowFile originalFlowFile) {
         String customQuery = 
context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue();
         AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
         AtomicReference<String> totalSize = new AtomicReference<>();
-        boolean isOriginalTransferred = false;
+        AtomicBoolean isOriginalTransferred = new AtomicBoolean(false);
         List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+        long startNanos = System.nanoTime();
         do {
-            FlowFile outgoingFlowFile;
             try (InputStream response = 
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
-                if (originalFlowFile != null) {
-                    outgoingFlowFile = session.create(originalFlowFile);
-                } else {
-                    outgoingFlowFile = session.create();
-                }
+                FlowFile outgoingFlowFile = createOutgoingFlowFile(session, 
originalFlowFile);
                 outgoingFlowFiles.add(outgoingFlowFile);
-                outgoingFlowFile = session.write(outgoingFlowFile, 
parseHttpResponse(response, nextRecordsUrl, totalSize));
-                int recordCount = nextRecordsUrl.get() != null ? 2000 : 
Integer.parseInt(totalSize.get()) % 2000;
+                outgoingFlowFile = session.write(outgoingFlowFile, 
parseCustomQueryResponse(response, nextRecordsUrl, totalSize));
+                int recordCount = nextRecordsUrl.get() != null ? 
MAX_RECORD_COUNT : Integer.parseInt(totalSize.get()) % MAX_RECORD_COUNT;
                 Map<String, String> attributes = new HashMap<>();
                 attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/json");
-                attributes.put(TOTAL_RECORD_COUNT, 
String.valueOf(recordCount));
+                attributes.put(TOTAL_RECORD_COUNT_ATTRIBUTE, 
String.valueOf(recordCount));

Review Comment:
   Please add "total.record.count" to @WritesAttributes section.
   
   Please use 
   outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
   
   in line 496.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to