krisztina-zsihovszki commented on code in PR #7019:
URL: https://github.com/apache/nifi/pull/7019#discussion_r1129775136
##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -385,144 +354,181 @@ private void processQuery(ProcessContext context,
ProcessSession session) {
.collect(Collectors.joining(","));
}
- String querySObject = buildQuery(
- sObject,
- fields,
- customWhereClause,
- ageField,
- initialAgeFilter,
- ageFilterLower,
- ageFilterUpper
- );
+ String querySObject = new SalesforceQueryBuilder(incrementalContext)
+ .buildQuery(sObject, fields, customWhereClause);
+
+ AtomicBoolean isOriginalTransferred = new AtomicBoolean(false);
+ List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+ Map<String, String> originalAttributes =
Optional.ofNullable(originalFlowFile)
+ .map(FlowFile::getAttributes)
+ .orElseGet(HashMap::new);
+
+ long startNanos = System.nanoTime();
do {
- FlowFile flowFile = session.create();
- Map<String, String> originalAttributes = flowFile.getAttributes();
- Map<String, String> attributes = new HashMap<>();
+ FlowFile outgoingFlowFile = createOutgoingFlowFile(session,
originalFlowFile);
+ outgoingFlowFiles.add(outgoingFlowFile);
+ Map<String, String> attributes = new HashMap<>(originalAttributes);
AtomicInteger recordCountHolder = new AtomicInteger();
- long startNanos = System.nanoTime();
- flowFile = session.write(flowFile, out -> {
- try (
- InputStream querySObjectResultInputStream =
getResultInputStream(nextRecordsUrl.get(), querySObject);
-
- JsonTreeRowRecordReader jsonReader = new
JsonTreeRowRecordReader(
- querySObjectResultInputStream,
- getLogger(),
- salesForceSchemaHolder.recordSchema,
- DATE_FORMAT,
- TIME_FORMAT,
- DATE_TIME_FORMAT,
- StartingFieldStrategy.NESTED_FIELD,
- STARTING_FIELD_NAME,
- SchemaApplicationStrategy.SELECTED_PART,
- CAPTURE_PREDICATE
- );
-
- RecordSetWriter writer = writerFactory.createWriter(
- getLogger(),
- writerFactory.getSchema(
- originalAttributes,
- salesForceSchemaHolder.recordSchema
- ),
- out,
- originalAttributes
- )
- ) {
- writer.beginRecordSet();
-
- Record querySObjectRecord;
- while ((querySObjectRecord = jsonReader.nextRecord()) !=
null) {
- writer.write(querySObjectRecord);
- }
-
- WriteResult writeResult = writer.finishRecordSet();
+ try {
+ outgoingFlowFile = session.write(outgoingFlowFile,
processRecordsCallback(context, nextRecordsUrl, writerFactory, state,
incrementalContext,
+ salesForceSchemaHolder, querySObject,
originalAttributes, attributes, recordCountHolder));
+ int recordCount = recordCountHolder.get();
- Map<String, String> capturedFields =
jsonReader.getCapturedFields();
+ if (createZeroRecordFlowFiles || recordCount != 0) {
+ outgoingFlowFile =
session.putAllAttributes(outgoingFlowFile, attributes);
-
nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, null));
+ session.adjustCounter("Records Processed", recordCount,
false);
+ getLogger().info("Successfully written {} records for {}",
recordCount, outgoingFlowFile);
+ } else {
+ outgoingFlowFiles.remove(outgoingFlowFile);
+ session.remove(outgoingFlowFile);
+ }
+ } catch (Exception e) {
+ if (e.getCause() instanceof IOException) {
+ throw new ProcessException("Couldn't get Salesforce
records", e);
+ } else if (e.getCause() instanceof SchemaNotFoundException) {
+ handleError(session, originalFlowFile,
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't create record writer");
+ } else if (e.getCause() instanceof MalformedRecordException) {
+ handleError(session, originalFlowFile,
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't read records from
input");
+ } else {
+ handleError(session, originalFlowFile,
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't get Salesforce records");
+ }
+ break;
+ }
+ } while (nextRecordsUrl.get() != null);
- attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
- attributes.putAll(writeResult.getAttributes());
+ transferFlowFiles(session, outgoingFlowFiles, originalFlowFile,
isOriginalTransferred, startNanos, sObject);
+ }
- recordCountHolder.set(writeResult.getRecordCount());
+ private OutputStreamCallback processRecordsCallback(ProcessContext
context, AtomicReference<String> nextRecordsUrl, RecordSetWriterFactory
writerFactory,
+ StateMap state,
IncrementalContext incrementalContext, SalesforceSchemaHolder
salesForceSchemaHolder,
+ String querySObject,
Map<String, String> originalAttributes, Map<String, String> attributes,
+ AtomicInteger
recordCountHolder) {
+ return out -> {
+ try {
+ handleRecordSet(out, nextRecordsUrl, querySObject,
writerFactory, salesForceSchemaHolder, originalAttributes, attributes,
recordCountHolder);
- if (ageFilterUpper != null) {
- Map<String, String> newState = new
HashMap<>(state.toMap());
- newState.put(LAST_AGE_FILTER, ageFilterUpper);
- updateState(context, newState);
- }
- } catch (SchemaNotFoundException e) {
- throw new ProcessException("Couldn't create record
writer", e);
- } catch (MalformedRecordException e) {
- throw new ProcessException("Couldn't read records from
input", e);
+ if (incrementalContext.getAgeFilterUpper() != null) {
+ Map<String, String> newState = new
HashMap<>(state.toMap());
+ newState.put(LAST_AGE_FILTER,
incrementalContext.getAgeFilterUpper());
+ updateState(context, newState);
}
- });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
- int recordCount = recordCountHolder.get();
+ private void handleRecordSet(OutputStream out, AtomicReference<String>
nextRecordsUrl, String querySObject, RecordSetWriterFactory writerFactory,
+ SalesforceSchemaHolder
salesForceSchemaHolder, Map<String, String> originalAttributes, Map<String,
String> attributes,
+ AtomicInteger recordCountHolder) throws
Exception {
+ try (
+ InputStream querySObjectResultInputStream =
getResultInputStream(nextRecordsUrl.get(), querySObject);
+ JsonTreeRowRecordReader jsonReader =
createJsonReader(querySObjectResultInputStream,
salesForceSchemaHolder.getRecordSchema());
+ RecordSetWriter writer = createRecordSetWriter(writerFactory,
originalAttributes, out, salesForceSchemaHolder.getRecordSchema())
+ ) {
+ writer.beginRecordSet();
+
+ Record querySObjectRecord;
+ while ((querySObjectRecord = jsonReader.nextRecord()) != null) {
+ writer.write(querySObjectRecord);
+ }
- if (!createZeroRecordFlowFiles && recordCount == 0) {
- session.remove(flowFile);
- } else {
- flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, REL_SUCCESS);
+ WriteResult writeResult = writer.finishRecordSet();
- long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().receive(flowFile,
salesforceRestService.getVersionedBaseUrl() + "/composite/tree/" + sObject,
- transferMillis);
+ Map<String, String> capturedFields =
jsonReader.getCapturedFields();
+ nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL,
null));
- session.adjustCounter("Records Processed", recordCount, false);
- getLogger().info("Successfully written {} records for {}",
recordCount, flowFile);
- }
- } while (nextRecordsUrl.get() != null);
+ attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ recordCountHolder.set(writeResult.getRecordCount());
+ }
+ }
+
+ private JsonTreeRowRecordReader createJsonReader(InputStream
querySObjectResultInputStream, RecordSchema recordSchema) throws IOException,
MalformedRecordException {
+ return new JsonTreeRowRecordReader(
+ querySObjectResultInputStream,
+ getLogger(),
+ recordSchema,
+ DATE_FORMAT,
+ TIME_FORMAT,
+ DATE_TIME_FORMAT,
+ StartingFieldStrategy.NESTED_FIELD,
+ STARTING_FIELD_NAME,
+ SchemaApplicationStrategy.SELECTED_PART,
+ CAPTURE_PREDICATE
+ );
+ }
+
+ private RecordSetWriter createRecordSetWriter(RecordSetWriterFactory
writerFactory, Map<String, String> originalAttributes, OutputStream out,
+ RecordSchema recordSchema)
throws IOException, SchemaNotFoundException {
+ return writerFactory.createWriter(
+ getLogger(),
+ writerFactory.getSchema(
+ originalAttributes,
+ recordSchema
+ ),
+ out,
+ originalAttributes
+ );
}
private void processCustomQuery(ProcessContext context, ProcessSession
session, FlowFile originalFlowFile) {
String customQuery =
context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue();
AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
AtomicReference<String> totalSize = new AtomicReference<>();
- boolean isOriginalTransferred = false;
+ AtomicBoolean isOriginalTransferred = new AtomicBoolean(false);
List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+ long startNanos = System.nanoTime();
do {
- FlowFile outgoingFlowFile;
try (InputStream response =
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
- if (originalFlowFile != null) {
- outgoingFlowFile = session.create(originalFlowFile);
- } else {
- outgoingFlowFile = session.create();
- }
+ FlowFile outgoingFlowFile = createOutgoingFlowFile(session,
originalFlowFile);
outgoingFlowFiles.add(outgoingFlowFile);
- outgoingFlowFile = session.write(outgoingFlowFile,
parseHttpResponse(response, nextRecordsUrl, totalSize));
- int recordCount = nextRecordsUrl.get() != null ? 2000 :
Integer.parseInt(totalSize.get()) % 2000;
+ outgoingFlowFile = session.write(outgoingFlowFile,
parseCustomQueryResponse(response, nextRecordsUrl, totalSize));
+ int recordCount = nextRecordsUrl.get() != null ?
MAX_RECORD_COUNT : Integer.parseInt(totalSize.get()) % MAX_RECORD_COUNT;
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(),
"application/json");
- attributes.put(TOTAL_RECORD_COUNT,
String.valueOf(recordCount));
+ attributes.put(TOTAL_RECORD_COUNT_ATTRIBUTE,
String.valueOf(recordCount));
Review Comment:
Please add "total.record.count" to @WritesAttributes section.
Please use
outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
in line 496.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]