tpalfy commented on code in PR #6670: URL: https://github.com/apache/nifi/pull/6670#discussion_r1028016451
########## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; + +import static org.apache.nifi.processors.salesforce.util.RecordExtender.ATTRIBUTES_RECORD_SCHEMA; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class TestRecordExtender { + + private static RecordExtender EXTENDER; + private static ObjectMapper OBJECT_MAPPER; + private static RecordSchema TEST_RECORD_SCHEMA; + private static RecordSchema EXTENDED_TEST_RECORD_SCHEMA; + + @BeforeAll + public static void setup() { + EXTENDER = new RecordExtender(); + OBJECT_MAPPER = new ObjectMapper(); + TEST_RECORD_SCHEMA = new SimpleRecordSchema(Arrays.asList( + new RecordField("testRecordField1", RecordFieldType.STRING.getDataType()), + new RecordField("testRecordField2", RecordFieldType.STRING.getDataType()) + )); + EXTENDED_TEST_RECORD_SCHEMA = new SimpleRecordSchema(Arrays.asList( + new RecordField("testRecordField1", RecordFieldType.STRING.getDataType()), + new RecordField("testRecordField2", RecordFieldType.STRING.getDataType()), + new RecordField("attributes", RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList( + new RecordField("type", RecordFieldType.STRING.getDataType()), + new RecordField("referenceId", RecordFieldType.STRING.getDataType() + ))))) + )); + } + + @Test + void testGetWrappedRecordJson() throws IOException { + ObjectNode testNode = OBJECT_MAPPER.createObjectNode(); + testNode.put("testField1", "testValue1"); + testNode.put("testField2", "testValue2"); + + ObjectNode expectedWrappedNode = OBJECT_MAPPER.createObjectNode(); + expectedWrappedNode.set("records", testNode); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(testNode.toString().getBytes()); + + ObjectNode actualWrappedJson = EXTENDER.getWrappedRecordsJson(out); + + assertEquals(expectedWrappedNode, actualWrappedJson); + } + + @Test + void testGetExtendedSchema() { + final SimpleRecordSchema actualRecordSchema = EXTENDER.getExtendedSchema(TEST_RECORD_SCHEMA); + + assertEquals(EXTENDED_TEST_RECORD_SCHEMA, actualRecordSchema); Review Comment: ```suggestion final SimpleRecordSchema actualExtendedSchema = EXTENDER.getExtendedSchema(TEST_RECORD_SCHEMA); assertEquals(EXPECTED_EXTENDED_SCHEMA, actualExtendedSchema); ``` ########## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce; + +import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockComponentLog; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class PutSalesforceRecordIT implements SalesforceConfigAware { + + private TestRunner runner; + + @BeforeEach + void setUp() throws Exception { + int maxRecordCount = 2; + Processor putSalesforceRecord = new CustomPutSalesforceRecord(maxRecordCount); + + runner = TestRunners.newTestRunner(putSalesforceRecord); + + StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner); + runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier()); Review Comment: ```suggestion runner.setProperty(PutSalesforceRecord.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier()); ``` ########## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce; + +import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockComponentLog; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class PutSalesforceRecordIT implements SalesforceConfigAware { + + private TestRunner runner; + + @BeforeEach + void setUp() throws Exception { + int maxRecordCount = 2; + Processor putSalesforceRecord = new CustomPutSalesforceRecord(maxRecordCount); Review Comment: Suggestion: We don't necessarily need a new class for this. `Custom...` is not a particularly descriptive name. An anonymous class shows the intent in a more streamlined way in my opinion. ```suggestion Processor putSalesforceRecord = new PutSalesforceRecord() { @Override int getMaxRecordCount() { return 2; } }; ``` ########## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce; + +import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockComponentLog; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class PutSalesforceRecordIT implements SalesforceConfigAware { + + private TestRunner runner; + + @BeforeEach + void setUp() throws Exception { + int maxRecordCount = 2; + Processor putSalesforceRecord = new CustomPutSalesforceRecord(maxRecordCount); + + runner = TestRunners.newTestRunner(putSalesforceRecord); + + StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner); + runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier()); + } + + @Test + void testPutSalesforceRecord() throws Exception { + MockComponentLog mockComponentLog = new MockComponentLog("id1", "testPutSalesforceRecord"); + InputStream in = readFile("src/test/resources/json/put_records.json"); + + MockFlowFile flowFile = new MockFlowFile(1L); + byte[] fileContent = Files.readAllBytes(Paths.get("src/test/resources/json/put_records.json")); + flowFile.setData(fileContent); + flowFile.putAttributes(Collections.singletonMap("objectType", "Account")); Review Comment: I think this part (as well as the the later `reader.createRecordReader(flowFile, in, mockComponentLog);` call) is not needed. When you mock the records them the input data is going to be ignored. ```suggestion ``` ########## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; + +import static org.apache.nifi.processors.salesforce.util.RecordExtender.ATTRIBUTES_RECORD_SCHEMA; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class TestRecordExtender { + + private static RecordExtender EXTENDER; + private static ObjectMapper OBJECT_MAPPER; + private static RecordSchema TEST_RECORD_SCHEMA; + private static RecordSchema EXTENDED_TEST_RECORD_SCHEMA; Review Comment: ```suggestion private static RecordSchema ORIGINAL_SCHEMA; private static RecordSchema EXPECTED_EXTENDED_SCHEMA; ``` ########## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce; + +import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockComponentLog; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class PutSalesforceRecordIT implements SalesforceConfigAware { + + private TestRunner runner; + + @BeforeEach + void setUp() throws Exception { + int maxRecordCount = 2; + Processor putSalesforceRecord = new CustomPutSalesforceRecord(maxRecordCount); + + runner = TestRunners.newTestRunner(putSalesforceRecord); + + StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner); + runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier()); + } + + @Test + void testPutSalesforceRecord() throws Exception { + MockComponentLog mockComponentLog = new MockComponentLog("id1", "testPutSalesforceRecord"); + InputStream in = readFile("src/test/resources/json/put_records.json"); + + MockFlowFile flowFile = new MockFlowFile(1L); + byte[] fileContent = Files.readAllBytes(Paths.get("src/test/resources/json/put_records.json")); + flowFile.setData(fileContent); + flowFile.putAttributes(Collections.singletonMap("objectType", "Account")); + + MockRecordParser reader = new MockRecordParser(); + reader.addSchemaField("name", RecordFieldType.STRING); + reader.addSchemaField("phone", RecordFieldType.STRING); + reader.addSchemaField("website", RecordFieldType.STRING); + reader.addSchemaField("numberOfEmployees", RecordFieldType.STRING); + reader.addSchemaField("industry", RecordFieldType.STRING); + + reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", "100", "Banking"); + reader.addRecord("SampleAccount2", "222222", "www.salesforce2.com", "200", "Banking"); + reader.addRecord("SampleAccount3", "333333", "www.salesforce3.com", "300", "Banking"); + reader.addRecord("SampleAccount4", "444444", "www.salesforce4.com", "400", "Banking"); + reader.addRecord("SampleAccount5", "555555", "www.salesforce5.com", "500", "Banking"); + + reader.createRecordReader(flowFile, in, mockComponentLog); + + runner.addControllerService("reader", reader); + runner.enableControllerService(reader); + + runner.setProperty(PutSalesforceRecord.API_VERSION, VERSION); + runner.setProperty(PutSalesforceRecord.API_URL, BASE_URL); + runner.setProperty(PutSalesforceRecord.RECORD_READER_FACTORY, reader.getIdentifier()); + + + runner.enqueue(flowFile); Review Comment: With my previous suggestion. ```suggestion runner.enqueue("", Collections.singletonMap("objectType", "Account")); ``` ########## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce; + +import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockComponentLog; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class PutSalesforceRecordIT implements SalesforceConfigAware { + + private TestRunner runner; + + @BeforeEach + void setUp() throws Exception { + int maxRecordCount = 2; + Processor putSalesforceRecord = new CustomPutSalesforceRecord(maxRecordCount); + + runner = TestRunners.newTestRunner(putSalesforceRecord); + + StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner); + runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier()); + } + + @Test + void testPutSalesforceRecord() throws Exception { + MockComponentLog mockComponentLog = new MockComponentLog("id1", "testPutSalesforceRecord"); + InputStream in = readFile("src/test/resources/json/put_records.json"); + + MockFlowFile flowFile = new MockFlowFile(1L); + byte[] fileContent = Files.readAllBytes(Paths.get("src/test/resources/json/put_records.json")); + flowFile.setData(fileContent); + flowFile.putAttributes(Collections.singletonMap("objectType", "Account")); + + MockRecordParser reader = new MockRecordParser(); + reader.addSchemaField("name", RecordFieldType.STRING); + reader.addSchemaField("phone", RecordFieldType.STRING); + reader.addSchemaField("website", RecordFieldType.STRING); + reader.addSchemaField("numberOfEmployees", RecordFieldType.STRING); + reader.addSchemaField("industry", RecordFieldType.STRING); + + reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", "100", "Banking"); + reader.addRecord("SampleAccount2", "222222", "www.salesforce2.com", "200", "Banking"); + reader.addRecord("SampleAccount3", "333333", "www.salesforce3.com", "300", "Banking"); + reader.addRecord("SampleAccount4", "444444", "www.salesforce4.com", "400", "Banking"); + reader.addRecord("SampleAccount5", "555555", "www.salesforce5.com", "500", "Banking"); + + reader.createRecordReader(flowFile, in, mockComponentLog); + + runner.addControllerService("reader", reader); + runner.enableControllerService(reader); + + runner.setProperty(PutSalesforceRecord.API_VERSION, VERSION); + runner.setProperty(PutSalesforceRecord.API_URL, BASE_URL); + runner.setProperty(PutSalesforceRecord.RECORD_READER_FACTORY, reader.getIdentifier()); + + + runner.enqueue(flowFile); + runner.run(); + + List<MockFlowFile> results = runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS); Review Comment: ```suggestion List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceRecord.REL_SUCCESS); ``` ########## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceRecord.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.nifi.NullSuppression; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.json.OutputGrouping; +import org.apache.nifi.json.WriteJsonResult; +import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.salesforce.util.RecordExtender; +import org.apache.nifi.processors.salesforce.util.SalesforceRestService; +import org.apache.nifi.schema.access.NopSchemaAccessWriter; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"salesforce", "sobject", "put"}) +@CapabilityDescription("Posts records to a Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's" + + " 'objectType' attribute.") +public class PutSalesforceRecord extends AbstractProcessor { + + private static final int MAX_RECORD_COUNT = 200; + + static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder() + .name("salesforce-url") + .displayName("URL") + .description( + "The URL for the Salesforce REST API including the domain without additional path information, such as https://MyDomainName.my.salesforce.com") + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor API_VERSION = new PropertyDescriptor.Builder() + .name("salesforce-api-version") + .displayName("API Version") + .description( + "The version number of the Salesforce REST API appended to the URL after the services/data path. See Salesforce documentation for supported versions") + .required(true) + .addValidator(StandardValidators.NUMBER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .defaultValue("54.0") + .build(); + + static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder() + .name("read-timeout") + .displayName("Read Timeout") + .description("Maximum time allowed for reading a response from the Salesforce REST API") + .required(true) + .defaultValue("15 s") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor TOKEN_PROVIDER = new PropertyDescriptor.Builder() + .name("oauth2-access-token-provider") + .displayName("OAuth2 Access Token Provider") + .description( + "Service providing OAuth2 Access Tokens for authenticating using the HTTP Authorization Header") + .identifiesControllerService(OAuth2AccessTokenProvider.class) + .required(true) + .build(); + + protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description( + "Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful execution.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("For FlowFiles created as a result of an execution error.") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + API_URL, + API_VERSION, + READ_TIMEOUT, + TOKEN_PROVIDER, + RECORD_READER_FACTORY + )); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS, + REL_FAILURE + ))); + + private volatile SalesforceRestService salesforceRestService; + private volatile int maxRecordCount; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + maxRecordCount = getMaxRecordCount(); + + String salesforceVersion = context.getProperty(API_VERSION).getValue(); + String baseUrl = context.getProperty(API_URL).getValue(); + OAuth2AccessTokenProvider accessTokenProvider = + context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class); + + salesforceRestService = new SalesforceRestService( + salesforceVersion, + baseUrl, + () -> accessTokenProvider.getAccessDetails().getAccessToken(), + context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS) + .intValue() + ); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + String objectType = flowFile.getAttribute("objectType"); + if (objectType == null) { + throw new ProcessException("Salesforce object type not found"); + } + + RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class); + + RecordExtender extender = new RecordExtender(); + + try (InputStream in = session.read(flowFile); + RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger()); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + WriteJsonResult writer = getWriter(reader.getSchema(), extender, out)) { + + AtomicInteger count = new AtomicInteger(); + RecordSchema originalSchema = reader.getSchema(); + Record record; + + while ((record = reader.nextRecord()) != null) { + count.incrementAndGet(); + if (!writer.isActiveRecordSet()) { + writer.beginRecordSet(); + } + + MapRecord extendedRecord = extender.getExtendedRecord(objectType, originalSchema, count.get(), record); + writer.write(extendedRecord); + + if (count.compareAndSet(maxRecordCount, 0)) { Review Comment: `count` is a local variable, cannot be shared among threads. Is there another reason why we need it as an `AtomicInteger`? ########## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.salesforce.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; + +import static org.apache.nifi.processors.salesforce.util.RecordExtender.ATTRIBUTES_RECORD_SCHEMA; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class TestRecordExtender { + + private static RecordExtender EXTENDER; Review Comment: Suggestion Usually the test subject should not be a constant. Also I like to call it `testSubject` because it's much easier to identify it that way in the tests. In general it's a good idea to give name _not_ based on the behaviour of the object (i.e. type) but based on its _purpose_ (in this case its primary purpose is to be tested). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
