krisztina-zsihovszki commented on code in PR #7019:
URL: https://github.com/apache/nifi/pull/7019#discussion_r1138356484
##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -385,144 +354,181 @@ private void processQuery(ProcessContext context,
ProcessSession session) {
.collect(Collectors.joining(","));
}
- String querySObject = buildQuery(
- sObject,
- fields,
- customWhereClause,
- ageField,
- initialAgeFilter,
- ageFilterLower,
- ageFilterUpper
- );
+ String querySObject = new SalesforceQueryBuilder(incrementalContext)
+ .buildQuery(sObject, fields, customWhereClause);
+
+ AtomicBoolean isOriginalTransferred = new AtomicBoolean(false);
+ List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+ Map<String, String> originalAttributes =
Optional.ofNullable(originalFlowFile)
+ .map(FlowFile::getAttributes)
+ .orElseGet(HashMap::new);
+
+ long startNanos = System.nanoTime();
do {
- FlowFile flowFile = session.create();
- Map<String, String> originalAttributes = flowFile.getAttributes();
- Map<String, String> attributes = new HashMap<>();
+ FlowFile outgoingFlowFile = createOutgoingFlowFile(session,
originalFlowFile);
+ outgoingFlowFiles.add(outgoingFlowFile);
+ Map<String, String> attributes = new HashMap<>(originalAttributes);
AtomicInteger recordCountHolder = new AtomicInteger();
- long startNanos = System.nanoTime();
- flowFile = session.write(flowFile, out -> {
- try (
- InputStream querySObjectResultInputStream =
getResultInputStream(nextRecordsUrl.get(), querySObject);
-
- JsonTreeRowRecordReader jsonReader = new
JsonTreeRowRecordReader(
- querySObjectResultInputStream,
- getLogger(),
- salesForceSchemaHolder.recordSchema,
- DATE_FORMAT,
- TIME_FORMAT,
- DATE_TIME_FORMAT,
- StartingFieldStrategy.NESTED_FIELD,
- STARTING_FIELD_NAME,
- SchemaApplicationStrategy.SELECTED_PART,
- CAPTURE_PREDICATE
- );
-
- RecordSetWriter writer = writerFactory.createWriter(
- getLogger(),
- writerFactory.getSchema(
- originalAttributes,
- salesForceSchemaHolder.recordSchema
- ),
- out,
- originalAttributes
- )
- ) {
- writer.beginRecordSet();
-
- Record querySObjectRecord;
- while ((querySObjectRecord = jsonReader.nextRecord()) !=
null) {
- writer.write(querySObjectRecord);
- }
-
- WriteResult writeResult = writer.finishRecordSet();
+ try {
+ outgoingFlowFile = session.write(outgoingFlowFile,
processRecordsCallback(context, nextRecordsUrl, writerFactory, state,
incrementalContext,
+ salesForceSchemaHolder, querySObject,
originalAttributes, attributes, recordCountHolder));
+ int recordCount = recordCountHolder.get();
- Map<String, String> capturedFields =
jsonReader.getCapturedFields();
+ if (createZeroRecordFlowFiles || recordCount != 0) {
+ outgoingFlowFile =
session.putAllAttributes(outgoingFlowFile, attributes);
-
nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, null));
+ session.adjustCounter("Records Processed", recordCount,
false);
+ getLogger().info("Successfully written {} records for {}",
recordCount, outgoingFlowFile);
+ } else {
+ outgoingFlowFiles.remove(outgoingFlowFile);
+ session.remove(outgoingFlowFile);
+ }
+ } catch (Exception e) {
+ if (e.getCause() instanceof IOException) {
+ throw new ProcessException("Couldn't get Salesforce
records", e);
+ } else if (e.getCause() instanceof SchemaNotFoundException) {
+ handleError(session, originalFlowFile,
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't create record writer");
+ } else if (e.getCause() instanceof MalformedRecordException) {
+ handleError(session, originalFlowFile,
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't read records from
input");
+ } else {
+ handleError(session, originalFlowFile,
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't get Salesforce records");
+ }
+ break;
+ }
+ } while (nextRecordsUrl.get() != null);
- attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
- attributes.putAll(writeResult.getAttributes());
+ transferFlowFiles(session, outgoingFlowFiles, originalFlowFile,
isOriginalTransferred, startNanos, sObject);
+ }
- recordCountHolder.set(writeResult.getRecordCount());
+ private OutputStreamCallback processRecordsCallback(ProcessContext
context, AtomicReference<String> nextRecordsUrl, RecordSetWriterFactory
writerFactory,
+ StateMap state,
IncrementalContext incrementalContext, SalesforceSchemaHolder
salesForceSchemaHolder,
+ String querySObject,
Map<String, String> originalAttributes, Map<String, String> attributes,
+ AtomicInteger
recordCountHolder) {
+ return out -> {
+ try {
+ handleRecordSet(out, nextRecordsUrl, querySObject,
writerFactory, salesForceSchemaHolder, originalAttributes, attributes,
recordCountHolder);
- if (ageFilterUpper != null) {
- Map<String, String> newState = new
HashMap<>(state.toMap());
- newState.put(LAST_AGE_FILTER, ageFilterUpper);
- updateState(context, newState);
- }
- } catch (SchemaNotFoundException e) {
- throw new ProcessException("Couldn't create record
writer", e);
- } catch (MalformedRecordException e) {
- throw new ProcessException("Couldn't read records from
input", e);
+ if (incrementalContext.getAgeFilterUpper() != null) {
+ Map<String, String> newState = new
HashMap<>(state.toMap());
+ newState.put(LAST_AGE_FILTER,
incrementalContext.getAgeFilterUpper());
+ updateState(context, newState);
}
- });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
- int recordCount = recordCountHolder.get();
+ private void handleRecordSet(OutputStream out, AtomicReference<String>
nextRecordsUrl, String querySObject, RecordSetWriterFactory writerFactory,
+ SalesforceSchemaHolder
salesForceSchemaHolder, Map<String, String> originalAttributes, Map<String,
String> attributes,
+ AtomicInteger recordCountHolder) throws
Exception {
+ try (
+ InputStream querySObjectResultInputStream =
getResultInputStream(nextRecordsUrl.get(), querySObject);
+ JsonTreeRowRecordReader jsonReader =
createJsonReader(querySObjectResultInputStream,
salesForceSchemaHolder.getRecordSchema());
+ RecordSetWriter writer = createRecordSetWriter(writerFactory,
originalAttributes, out, salesForceSchemaHolder.getRecordSchema())
+ ) {
+ writer.beginRecordSet();
+
+ Record querySObjectRecord;
+ while ((querySObjectRecord = jsonReader.nextRecord()) != null) {
+ writer.write(querySObjectRecord);
+ }
- if (!createZeroRecordFlowFiles && recordCount == 0) {
- session.remove(flowFile);
- } else {
- flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, REL_SUCCESS);
+ WriteResult writeResult = writer.finishRecordSet();
- long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().receive(flowFile,
salesforceRestService.getVersionedBaseUrl() + "/composite/tree/" + sObject,
- transferMillis);
+ Map<String, String> capturedFields =
jsonReader.getCapturedFields();
+ nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL,
null));
- session.adjustCounter("Records Processed", recordCount, false);
- getLogger().info("Successfully written {} records for {}",
recordCount, flowFile);
- }
- } while (nextRecordsUrl.get() != null);
+ attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ recordCountHolder.set(writeResult.getRecordCount());
+ }
+ }
+
+ private JsonTreeRowRecordReader createJsonReader(InputStream
querySObjectResultInputStream, RecordSchema recordSchema) throws IOException,
MalformedRecordException {
+ return new JsonTreeRowRecordReader(
+ querySObjectResultInputStream,
+ getLogger(),
+ recordSchema,
+ DATE_FORMAT,
+ TIME_FORMAT,
+ DATE_TIME_FORMAT,
+ StartingFieldStrategy.NESTED_FIELD,
+ STARTING_FIELD_NAME,
+ SchemaApplicationStrategy.SELECTED_PART,
+ CAPTURE_PREDICATE
+ );
+ }
+
+ private RecordSetWriter createRecordSetWriter(RecordSetWriterFactory
writerFactory, Map<String, String> originalAttributes, OutputStream out,
+ RecordSchema recordSchema)
throws IOException, SchemaNotFoundException {
+ return writerFactory.createWriter(
+ getLogger(),
+ writerFactory.getSchema(
+ originalAttributes,
+ recordSchema
+ ),
+ out,
+ originalAttributes
+ );
}
private void processCustomQuery(ProcessContext context, ProcessSession
session, FlowFile originalFlowFile) {
String customQuery =
context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue();
AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
AtomicReference<String> totalSize = new AtomicReference<>();
- boolean isOriginalTransferred = false;
+ AtomicBoolean isOriginalTransferred = new AtomicBoolean(false);
List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+ long startNanos = System.nanoTime();
do {
- FlowFile outgoingFlowFile;
try (InputStream response =
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
- if (originalFlowFile != null) {
- outgoingFlowFile = session.create(originalFlowFile);
- } else {
- outgoingFlowFile = session.create();
- }
+ FlowFile outgoingFlowFile = createOutgoingFlowFile(session,
originalFlowFile);
outgoingFlowFiles.add(outgoingFlowFile);
- outgoingFlowFile = session.write(outgoingFlowFile,
parseHttpResponse(response, nextRecordsUrl, totalSize));
- int recordCount = nextRecordsUrl.get() != null ? 2000 :
Integer.parseInt(totalSize.get()) % 2000;
+ outgoingFlowFile = session.write(outgoingFlowFile,
parseCustomQueryResponse(response, nextRecordsUrl, totalSize));
+ int recordCount = nextRecordsUrl.get() != null ?
MAX_RECORD_COUNT : Integer.parseInt(totalSize.get()) % MAX_RECORD_COUNT;
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(),
"application/json");
- attributes.put(TOTAL_RECORD_COUNT,
String.valueOf(recordCount));
+ attributes.put(TOTAL_RECORD_COUNT_ATTRIBUTE,
String.valueOf(recordCount));
session.adjustCounter("Salesforce records processed",
recordCount, false);
session.putAllAttributes(outgoingFlowFile, attributes);
} catch (IOException e) {
throw new ProcessException("Couldn't get Salesforce records",
e);
} catch (Exception e) {
- if (originalFlowFile != null) {
- session.transfer(originalFlowFile, REL_FAILURE);
- isOriginalTransferred = true;
- }
- getLogger().error("Couldn't get Salesforce records", e);
- session.remove(outgoingFlowFiles);
- outgoingFlowFiles.clear();
+ handleError(session, originalFlowFile, isOriginalTransferred,
outgoingFlowFiles, e, "Couldn't get Salesforce records");
break;
}
} while (nextRecordsUrl.get() != null);
+ transferFlowFiles(session, outgoingFlowFiles, originalFlowFile,
isOriginalTransferred, startNanos, "custom");
+ }
+
+ private void transferFlowFiles(ProcessSession session, List<FlowFile>
outgoingFlowFiles, FlowFile originalFlowFile, AtomicBoolean
isOriginalTransferred,
+ long startNanos, String urlDetail) {
if (!outgoingFlowFiles.isEmpty()) {
session.transfer(outgoingFlowFiles, REL_SUCCESS);
+ long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+
+ outgoingFlowFiles.forEach(ff ->
+ session.getProvenanceReporter().receive(ff,
salesforceRestService.getVersionedBaseUrl() + urlDetail, transferMillis)
Review Comment:
Wrong URi is created for custom query, a / is missing:
"https://xxx.my.salesforce.com/services/data/v54.0custom"
##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java:
##########
@@ -146,69 +136,85 @@ public void onTrigger(ProcessContext context,
ProcessSession session) throws Pro
String objectType = flowFile.getAttribute(ATTR_OBJECT_TYPE);
if (objectType == null) {
- getLogger().error("Salesforce object type not found among the
incoming FlowFile attributes");
- flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE,
"Salesforce object type not found among FlowFile attributes");
- session.transfer(session.penalize(flowFile), REL_FAILURE);
+ handleInvalidFlowFile(session, flowFile);
return;
}
+ try {
+ long startNanos = System.nanoTime();
+ processRecords(flowFile, objectType, context, session);
+ session.transfer(flowFile, REL_SUCCESS);
+ long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile,
salesforceRestClient.getVersionedBaseUrl() + "/put/" + objectType,
transferMillis);
+ } catch (MalformedRecordException e) {
+ getLogger().error("Couldn't read records from input", e);
+ transferToFailure(session, flowFile, e);
+ } catch (SchemaNotFoundException e) {
+ getLogger().error("Couldn't create record writer", e);
+ transferToFailure(session, flowFile, e);
+ } catch (Exception e) {
+ getLogger().error("Failed to put records to Salesforce.", e);
+ transferToFailure(session, flowFile, e);
+ }
+ }
+
+ private void processRecords(FlowFile flowFile, String objectType,
ProcessContext context, ProcessSession session) throws IOException,
MalformedRecordException, SchemaNotFoundException {
RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+ int count = 0;
+ RecordExtender recordExtender;
- RecordExtender extender;
- long startNanos = System.nanoTime();
- try {
try (InputStream in = session.read(flowFile);
RecordReader reader = readerFactory.createRecordReader(flowFile,
in, getLogger());
ByteArrayOutputStream out = new ByteArrayOutputStream();
- WriteJsonResult writer = getWriter(extender = new
RecordExtender(reader.getSchema()), out)) {
+ WriteJsonResult writer = getWriter(recordExtender =
getExtender(reader), out)) {
- int count = 0;
Record record;
-
while ((record = reader.nextRecord()) != null) {
count++;
if (!writer.isActiveRecordSet()) {
writer.beginRecordSet();
}
- MapRecord extendedRecord =
extender.getExtendedRecord(objectType, count, record);
+ MapRecord extendedRecord =
recordExtender.getExtendedRecord(objectType, count, record);
writer.write(extendedRecord);
if (count == maxRecordCount) {
count = 0;
- processRecords(objectType, out, writer, extender);
+ postRecordBatch(objectType, out, writer, recordExtender);
out.reset();
}
}
if (writer.isActiveRecordSet()) {
- processRecords(objectType, out, writer, extender);
+ postRecordBatch(objectType, out, writer, recordExtender);
}
- }
- session.transfer(flowFile, REL_SUCCESS);
- long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().send(flowFile,
salesforceRestService.getVersionedBaseUrl()+ "/composite/tree/" + objectType,
transferMillis);
- } catch (MalformedRecordException e) {
- getLogger().error("Couldn't read records from input", e);
- transferToFailure(session, flowFile, e);
- } catch (SchemaNotFoundException e) {
- getLogger().error("Couldn't create record writer", e);
- transferToFailure(session, flowFile, e);
- } catch (Exception e) {
- getLogger().error("Failed to put records to Salesforce.", e);
- transferToFailure(session, flowFile, e);
}
}
+ private SalesforceConfiguration
createSalesforceConfiguration(ProcessContext context) {
+ String salesforceVersion = context.getProperty(API_VERSION).getValue();
+ String instanceUrl =
context.getProperty(SALESFORCE_INSTANCE_URL).getValue();
+ OAuth2AccessTokenProvider accessTokenProvider =
+
context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ return SalesforceConfiguration.create(instanceUrl, salesforceVersion,
+ () -> accessTokenProvider.getAccessDetails().getAccessToken(),
0);
+ }
+
+ private void handleInvalidFlowFile(ProcessSession session, FlowFile
flowFile) {
Review Comment:
The handleInvalidFlowFile and transferToFailure methods are quite similar.
You can change transferToFailure a bit and reuse it in handleInvalidFlowFile.
(The transferToFailure uses the message from the exception parameter only.)
private void transferToFailure(ProcessSession session, FlowFile flowFile,
String message) {
flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE,
message);
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
private void handleInvalidFlowFile(ProcessSession session, FlowFile
flowFile) {
getLogger().error("Salesforce object type not found among the
incoming FlowFile attributes");
transferToFailure(session, flowFile, "Salesforce object type not
found among FlowFile attributes");
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]