This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 1273152d781b754d0ba0a57506fa44783bb65d1d Author: krisztina-zsihovszki <[email protected]> AuthorDate: Wed Feb 15 12:37:31 2023 +0100 NIFI-11158 PutSalesforceObject processor improvements This closes #6959. Reviewed-by: Lehel <[email protected]> Reviewed-by: Mark Bathori <[email protected]> Signed-off-by: Peter Turcsanyi <[email protected]> --- .../nifi/util/StandardProcessorTestRunner.java | 13 ++++- .../main/java/org/apache/nifi/util/TestRunner.java | 7 +++ .../processors/salesforce/PutSalesforceObject.java | 34 ++++++++--- .../salesforce/QuerySalesforceObject.java | 8 ++- .../processors/salesforce/util/RecordExtender.java | 4 +- .../salesforce/util/SalesforceRestService.java | 15 ++++- .../salesforce/PutSalesforceObjectIT.java | 67 +++++++++++++++++++--- .../salesforce/QuerySalesforceObjectIT.java | 2 + 8 files changed, 126 insertions(+), 24 deletions(-) diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index b2b0557dad..99bec62b9e 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -40,6 +40,7 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.registry.VariableDescriptor; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.state.MockStateManager; @@ -72,6 +73,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toSet; import static org.junit.jupiter.api.Assertions.assertEquals; public class StandardProcessorTestRunner implements TestRunner { @@ -367,7 +369,7 @@ public class StandardProcessorTestRunner implements TestRunner { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) ) - .collect(Collectors.toSet()); + .collect(toSet()); assertEquals(expectedAttributes, actualAttributes); } @@ -1056,4 +1058,13 @@ public class StandardProcessorTestRunner implements TestRunner { public void setRunSchedule(long runSchedule) { this.runSchedule = runSchedule; } + + @Override + public void assertProvenanceEvent(final ProvenanceEventType eventType) { + Set<ProvenanceEventType> expectedEventTypes = Collections.singleton(eventType); + Set<ProvenanceEventType> actualEventTypes = getProvenanceEvents().stream() + .map(ProvenanceEventRecord::getEventType) + .collect(toSet()); + assertEquals(expectedEventTypes, actualEventTypes); + } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index 7b01ed9709..dbe31e73c8 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -28,6 +28,7 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.state.MockStateManager; @@ -1062,4 +1063,10 @@ public interface TestRunner { */ void setRunSchedule(long runSchedule); + /** + * Assert that provenance event was created with the specified event type. + * + * @param eventType Provenance event type + */ + void assertProvenanceEvent(ProvenanceEventType eventType); } diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java index 00d0c2b783..83605800f9 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java @@ -20,7 +20,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.nifi.NullSuppression; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; @@ -64,9 +66,13 @@ import static org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert @CapabilityDescription("Creates new records for the specified Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's" + " 'objectType' attribute. This processor cannot update existing records.") @ReadsAttribute(attribute = "objectType", description = "The Salesforce object type to upload records to. E.g. Account, Contact, Campaign.") +@WritesAttribute(attribute = "error.message", description = "The error message returned by Salesforce.") +@SeeAlso(QuerySalesforceObject.class) public class PutSalesforceObject extends AbstractProcessor { private static final int MAX_RECORD_COUNT = 200; + private static final String ATTR_OBJECT_TYPE = "objectType"; + private static final String ATTR_ERROR_MESSAGE = "error.message"; protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() .name("record-reader") @@ -138,15 +144,19 @@ public class PutSalesforceObject extends AbstractProcessor { return; } - String objectType = flowFile.getAttribute("objectType"); + String objectType = flowFile.getAttribute(ATTR_OBJECT_TYPE); if (objectType == null) { - throw new ProcessException("Salesforce object type not found among the incoming flowfile attributes"); + getLogger().error("Salesforce object type not found among the incoming FlowFile attributes"); + flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, "Salesforce object type not found among FlowFile attributes"); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; } RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class); RecordExtender extender; - + long startNanos = System.nanoTime(); + try { try (InputStream in = session.read(flowFile); RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger()); ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -170,24 +180,30 @@ public class PutSalesforceObject extends AbstractProcessor { out.reset(); } } - if (writer.isActiveRecordSet()) { processRecords(objectType, out, writer, extender); } - session.transfer(flowFile, REL_SUCCESS); - + } + session.transfer(flowFile, REL_SUCCESS); + long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, salesforceRestService.getVersionedBaseUrl()+ "/composite/tree/" + objectType, transferMillis); } catch (MalformedRecordException e) { getLogger().error("Couldn't read records from input", e); - session.transfer(flowFile, REL_FAILURE); + transferToFailure(session, flowFile, e); } catch (SchemaNotFoundException e) { getLogger().error("Couldn't create record writer", e); - session.transfer(flowFile, REL_FAILURE); + transferToFailure(session, flowFile, e); } catch (Exception e) { getLogger().error("Failed to put records to Salesforce.", e); - session.transfer(flowFile, REL_FAILURE); + transferToFailure(session, flowFile, e); } } + private void transferToFailure(ProcessSession session, FlowFile flowFile, Exception e) { + flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, e.getMessage()); + session.transfer(session.penalize(flowFile), REL_FAILURE); + } + private void processRecords(String objectType, ByteArrayOutputStream out, WriteJsonResult writer, RecordExtender extender) throws IOException { writer.finishRecordSet(); writer.flush(); diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java index 8235375d69..b5125a9eb1 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java @@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; @@ -104,6 +105,7 @@ import static org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile.") }) @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") +@SeeAlso(PutSalesforceObject.class) public class QuerySalesforceObject extends AbstractProcessor { static final PropertyDescriptor SOBJECT_NAME = new PropertyDescriptor.Builder() @@ -314,7 +316,7 @@ public class QuerySalesforceObject extends AbstractProcessor { Map<String, String> attributes = new HashMap<>(); AtomicInteger recordCountHolder = new AtomicInteger(); - + long startNanos = System.nanoTime(); flowFile = session.write(flowFile, out -> { try ( InputStream querySObjectResultInputStream = getResultInputStream(nextRecordsUrl, querySObject); @@ -381,6 +383,10 @@ public class QuerySalesforceObject extends AbstractProcessor { flowFile = session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, REL_SUCCESS); + long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().receive(flowFile, salesforceRestService.getVersionedBaseUrl() + "/composite/tree/" + sObject, + transferMillis); + session.adjustCounter("Records Processed", recordCount, false); getLogger().info("Successfully written {} records for {}", recordCount, flowFile); } diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java index 9e8b03e9d5..6af98cd725 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java @@ -66,7 +66,9 @@ public class RecordExtender { public MapRecord getExtendedRecord(String objectType, int count, Record record) { - Set<String> rawFieldNames = record.getRawFieldNames(); + Set<String> rawFieldNames = record.getRawFieldNames().stream() + .filter(fieldName -> record.getValue(fieldName) != null) + .collect(Collectors.toSet()); Map<String, Object> objectMap = rawFieldNames.stream() .collect(Collectors.toMap(Function.identity(), record::getValue)); diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java index cc3f4b2bf8..0affbc441e 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java @@ -44,7 +44,7 @@ public class SalesforceRestService { } public InputStream describeSObject(String sObject) { - String url = baseUrl + "/services/data/v" + version + "/sobjects/" + sObject + "/describe?maxRecords=1"; + String url = getVersionedBaseUrl() + "/sobjects/" + sObject + "/describe?maxRecords=1"; Request request = new Request.Builder() .addHeader("Authorization", "Bearer " + accessTokenProvider.get()) @@ -56,7 +56,7 @@ public class SalesforceRestService { } public InputStream query(String query) { - String url = baseUrl + "/services/data/v" + version + "/query"; + String url = getVersionedBaseUrl() + "/query"; HttpUrl httpUrl = HttpUrl.get(url).newBuilder() .addQueryParameter("q", query) @@ -87,7 +87,7 @@ public class SalesforceRestService { } public InputStream postRecord(String sObjectApiName, String body) { - String url = baseUrl + "/services/data/v" + version + "/composite/tree/" + sObjectApiName; + String url = getVersionedBaseUrl() + "/composite/tree/" + sObjectApiName; HttpUrl httpUrl = HttpUrl.get(url).newBuilder() .build(); @@ -103,6 +103,10 @@ public class SalesforceRestService { return request(request); } + public String getVersionedBaseUrl() { + return baseUrl + "/services/data/v" + version; + } + private InputStream request(Request request) { Response response = null; try { @@ -115,6 +119,11 @@ public class SalesforceRestService { ); } return response.body().byteStream(); + } catch (ProcessException e) { + if (response != null) { + response.close(); + } + throw e; } catch (Exception e) { if (response != null) { response.close(); diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java index 390fa8ac0f..2c296a119f 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java @@ -20,6 +20,8 @@ import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider; import org.apache.nifi.processor.Processor; import org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties; import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.util.MockFlowFile; @@ -32,6 +34,7 @@ import java.util.Collections; import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; class PutSalesforceObjectIT implements SalesforceConfigAware { @@ -62,25 +65,71 @@ class PutSalesforceObjectIT implements SalesforceConfigAware { reader.addSchemaField("numberOfEmployees", RecordFieldType.STRING); reader.addSchemaField("industry", RecordFieldType.STRING); - reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", "100", "Banking"); - reader.addRecord("SampleAccount2", "222222", "www.salesforce2.com", "200", "Banking"); + reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", null, "Banking"); + reader.addRecord("SampleAccount2", null, "www.salesforce2.com", "200", "Banking"); reader.addRecord("SampleAccount3", "333333", "www.salesforce3.com", "300", "Banking"); - reader.addRecord("SampleAccount4", "444444", "www.salesforce4.com", "400", "Banking"); + reader.addRecord("SampleAccount4", "444444", null, "400", "Banking"); reader.addRecord("SampleAccount5", "555555", "www.salesforce5.com", "500", "Banking"); runner.enqueue("", Collections.singletonMap("objectType", "Account")); - runner.addControllerService("reader", reader); - runner.enableControllerService(reader); - - runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION); - runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL); - runner.setProperty(PutSalesforceObject.RECORD_READER_FACTORY, reader.getIdentifier()); + configureProcessor(reader); runner.run(); List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_SUCCESS); assertEquals(1, results.size()); + + runner.assertProvenanceEvent(ProvenanceEventType.SEND); + } + + @Test + void testMissingObjectType() throws Exception { + MockRecordParser reader = new MockRecordParser(); + + runner.enqueue(""); + + configureProcessor(reader); + + runner.run(); + + List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE); + assertEquals(1, results.size()); + assertTrue(runner.getProvenanceEvents().isEmpty()); + + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeExists("error.message"); + } + + @Test + void testErrorForInvalidRecordField() throws Exception { + MockRecordParser reader = new MockRecordParser(); + reader.addSchemaField("invalidField", RecordFieldType.STRING); + reader.addRecord("invalidField"); + + runner.enqueue("", Collections.singletonMap("objectType", "Account")); + + configureProcessor(reader); + + runner.run(); + + List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE); + assertEquals(1, results.size()); + assertTrue(runner.getProvenanceEvents().isEmpty()); + + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeExists("error.message"); + } + + private void configureProcessor(final MockRecordParser reader) throws InitializationException { + runner.addControllerService("reader", reader); + runner.enableControllerService(reader); + + runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION); + runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL); + runner.setProperty(PutSalesforceObject.RECORD_READER_FACTORY, reader.getIdentifier()); } } diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java index f3f5bd50eb..661df2673a 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java @@ -20,6 +20,7 @@ import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider; import org.apache.nifi.processor.Processor; import org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties; import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.util.MockFlowFile; @@ -76,5 +77,6 @@ class QuerySalesforceObjectIT implements SalesforceConfigAware { List<MockFlowFile> results = runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS); assertNotNull(results.get(0).getContent()); + runner.assertProvenanceEvent(ProvenanceEventType.RECEIVE); } }
