tpalfy commented on code in PR #6670:
URL: https://github.com/apache/nifi/pull/6670#discussion_r1028016451


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static 
org.apache.nifi.processors.salesforce.util.RecordExtender.ATTRIBUTES_RECORD_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestRecordExtender {
+
+    private static RecordExtender EXTENDER;
+    private static ObjectMapper OBJECT_MAPPER;
+    private static RecordSchema TEST_RECORD_SCHEMA;
+    private static RecordSchema EXTENDED_TEST_RECORD_SCHEMA;
+
+    @BeforeAll
+    public static void setup() {
+        EXTENDER = new RecordExtender();
+        OBJECT_MAPPER = new ObjectMapper();
+        TEST_RECORD_SCHEMA = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("testRecordField1", 
RecordFieldType.STRING.getDataType()),
+                new RecordField("testRecordField2", 
RecordFieldType.STRING.getDataType())
+        ));
+        EXTENDED_TEST_RECORD_SCHEMA = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("testRecordField1", 
RecordFieldType.STRING.getDataType()),
+                new RecordField("testRecordField2", 
RecordFieldType.STRING.getDataType()),
+                new RecordField("attributes", 
RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
+                        new RecordField("type", 
RecordFieldType.STRING.getDataType()),
+                        new RecordField("referenceId", 
RecordFieldType.STRING.getDataType()
+                        )))))
+        ));
+    }
+
+    @Test
+    void testGetWrappedRecordJson() throws IOException {
+        ObjectNode testNode = OBJECT_MAPPER.createObjectNode();
+        testNode.put("testField1", "testValue1");
+        testNode.put("testField2", "testValue2");
+
+        ObjectNode expectedWrappedNode = OBJECT_MAPPER.createObjectNode();
+        expectedWrappedNode.set("records", testNode);
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        out.write(testNode.toString().getBytes());
+
+        ObjectNode actualWrappedJson = EXTENDER.getWrappedRecordsJson(out);
+
+        assertEquals(expectedWrappedNode, actualWrappedJson);
+    }
+
+    @Test
+    void testGetExtendedSchema() {
+        final SimpleRecordSchema actualRecordSchema = 
EXTENDER.getExtendedSchema(TEST_RECORD_SCHEMA);
+
+        assertEquals(EXTENDED_TEST_RECORD_SCHEMA, actualRecordSchema);

Review Comment:
   ```suggestion
           final SimpleRecordSchema actualExtendedSchema = 
EXTENDER.getExtendedSchema(TEST_RECORD_SCHEMA);
   
           assertEquals(EXPECTED_EXTENDED_SCHEMA, actualExtendedSchema);
   
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class PutSalesforceRecordIT implements SalesforceConfigAware {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        int maxRecordCount = 2;
+        Processor putSalesforceRecord = new 
CustomPutSalesforceRecord(maxRecordCount);
+
+        runner = TestRunners.newTestRunner(putSalesforceRecord);
+
+        StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = 
initOAuth2AccessTokenProvider(runner);
+        runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, 
oauth2AccessTokenProvider.getIdentifier());

Review Comment:
   ```suggestion
           runner.setProperty(PutSalesforceRecord.TOKEN_PROVIDER, 
oauth2AccessTokenProvider.getIdentifier());
   
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class PutSalesforceRecordIT implements SalesforceConfigAware {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        int maxRecordCount = 2;
+        Processor putSalesforceRecord = new 
CustomPutSalesforceRecord(maxRecordCount);

Review Comment:
   Suggestion: We don't necessarily need a new class for this. `Custom...` is 
not a particularly descriptive name. An anonymous class shows the intent in a 
more streamlined way in my opinion.
   ```suggestion
           Processor putSalesforceRecord = new PutSalesforceRecord() {
               @Override
               int getMaxRecordCount() {
                   return 2;
               }
           };
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class PutSalesforceRecordIT implements SalesforceConfigAware {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        int maxRecordCount = 2;
+        Processor putSalesforceRecord = new 
CustomPutSalesforceRecord(maxRecordCount);
+
+        runner = TestRunners.newTestRunner(putSalesforceRecord);
+
+        StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = 
initOAuth2AccessTokenProvider(runner);
+        runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, 
oauth2AccessTokenProvider.getIdentifier());
+    }
+
+    @Test
+    void testPutSalesforceRecord() throws Exception {
+        MockComponentLog mockComponentLog = new MockComponentLog("id1", 
"testPutSalesforceRecord");
+        InputStream in = readFile("src/test/resources/json/put_records.json");
+
+        MockFlowFile flowFile = new MockFlowFile(1L);
+        byte[] fileContent = 
Files.readAllBytes(Paths.get("src/test/resources/json/put_records.json"));
+        flowFile.setData(fileContent);
+        flowFile.putAttributes(Collections.singletonMap("objectType", 
"Account"));

Review Comment:
   I think this part (as well as the the later 
`reader.createRecordReader(flowFile, in, mockComponentLog);` call) is not 
needed. When you mock the records them the input data is going to be ignored.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static 
org.apache.nifi.processors.salesforce.util.RecordExtender.ATTRIBUTES_RECORD_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestRecordExtender {
+
+    private static RecordExtender EXTENDER;
+    private static ObjectMapper OBJECT_MAPPER;
+    private static RecordSchema TEST_RECORD_SCHEMA;
+    private static RecordSchema EXTENDED_TEST_RECORD_SCHEMA;

Review Comment:
   ```suggestion
       private static RecordSchema ORIGINAL_SCHEMA;
       private static RecordSchema EXPECTED_EXTENDED_SCHEMA;
   
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class PutSalesforceRecordIT implements SalesforceConfigAware {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        int maxRecordCount = 2;
+        Processor putSalesforceRecord = new 
CustomPutSalesforceRecord(maxRecordCount);
+
+        runner = TestRunners.newTestRunner(putSalesforceRecord);
+
+        StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = 
initOAuth2AccessTokenProvider(runner);
+        runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, 
oauth2AccessTokenProvider.getIdentifier());
+    }
+
+    @Test
+    void testPutSalesforceRecord() throws Exception {
+        MockComponentLog mockComponentLog = new MockComponentLog("id1", 
"testPutSalesforceRecord");
+        InputStream in = readFile("src/test/resources/json/put_records.json");
+
+        MockFlowFile flowFile = new MockFlowFile(1L);
+        byte[] fileContent = 
Files.readAllBytes(Paths.get("src/test/resources/json/put_records.json"));
+        flowFile.setData(fileContent);
+        flowFile.putAttributes(Collections.singletonMap("objectType", 
"Account"));
+
+        MockRecordParser reader = new MockRecordParser();
+        reader.addSchemaField("name", RecordFieldType.STRING);
+        reader.addSchemaField("phone", RecordFieldType.STRING);
+        reader.addSchemaField("website", RecordFieldType.STRING);
+        reader.addSchemaField("numberOfEmployees", RecordFieldType.STRING);
+        reader.addSchemaField("industry", RecordFieldType.STRING);
+
+        reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", 
"100", "Banking");
+        reader.addRecord("SampleAccount2", "222222", "www.salesforce2.com", 
"200", "Banking");
+        reader.addRecord("SampleAccount3", "333333", "www.salesforce3.com", 
"300", "Banking");
+        reader.addRecord("SampleAccount4", "444444", "www.salesforce4.com", 
"400", "Banking");
+        reader.addRecord("SampleAccount5", "555555", "www.salesforce5.com", 
"500", "Banking");
+
+        reader.createRecordReader(flowFile, in, mockComponentLog);
+
+        runner.addControllerService("reader", reader);
+        runner.enableControllerService(reader);
+
+        runner.setProperty(PutSalesforceRecord.API_VERSION, VERSION);
+        runner.setProperty(PutSalesforceRecord.API_URL, BASE_URL);
+        runner.setProperty(PutSalesforceRecord.RECORD_READER_FACTORY, 
reader.getIdentifier());
+
+
+        runner.enqueue(flowFile);

Review Comment:
   With my previous suggestion.
   ```suggestion
           runner.enqueue("", Collections.singletonMap("objectType", 
"Account"));
   
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class PutSalesforceRecordIT implements SalesforceConfigAware {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        int maxRecordCount = 2;
+        Processor putSalesforceRecord = new 
CustomPutSalesforceRecord(maxRecordCount);
+
+        runner = TestRunners.newTestRunner(putSalesforceRecord);
+
+        StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = 
initOAuth2AccessTokenProvider(runner);
+        runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, 
oauth2AccessTokenProvider.getIdentifier());
+    }
+
+    @Test
+    void testPutSalesforceRecord() throws Exception {
+        MockComponentLog mockComponentLog = new MockComponentLog("id1", 
"testPutSalesforceRecord");
+        InputStream in = readFile("src/test/resources/json/put_records.json");
+
+        MockFlowFile flowFile = new MockFlowFile(1L);
+        byte[] fileContent = 
Files.readAllBytes(Paths.get("src/test/resources/json/put_records.json"));
+        flowFile.setData(fileContent);
+        flowFile.putAttributes(Collections.singletonMap("objectType", 
"Account"));
+
+        MockRecordParser reader = new MockRecordParser();
+        reader.addSchemaField("name", RecordFieldType.STRING);
+        reader.addSchemaField("phone", RecordFieldType.STRING);
+        reader.addSchemaField("website", RecordFieldType.STRING);
+        reader.addSchemaField("numberOfEmployees", RecordFieldType.STRING);
+        reader.addSchemaField("industry", RecordFieldType.STRING);
+
+        reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", 
"100", "Banking");
+        reader.addRecord("SampleAccount2", "222222", "www.salesforce2.com", 
"200", "Banking");
+        reader.addRecord("SampleAccount3", "333333", "www.salesforce3.com", 
"300", "Banking");
+        reader.addRecord("SampleAccount4", "444444", "www.salesforce4.com", 
"400", "Banking");
+        reader.addRecord("SampleAccount5", "555555", "www.salesforce5.com", 
"500", "Banking");
+
+        reader.createRecordReader(flowFile, in, mockComponentLog);
+
+        runner.addControllerService("reader", reader);
+        runner.enableControllerService(reader);
+
+        runner.setProperty(PutSalesforceRecord.API_VERSION, VERSION);
+        runner.setProperty(PutSalesforceRecord.API_URL, BASE_URL);
+        runner.setProperty(PutSalesforceRecord.RECORD_READER_FACTORY, 
reader.getIdentifier());
+
+
+        runner.enqueue(flowFile);
+        runner.run();
+
+        List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS);

Review Comment:
   ```suggestion
           List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(PutSalesforceRecord.REL_SUCCESS);
   
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceRecord.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Posts records to a Salesforce sObject. The type of the 
Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute.")
+public class PutSalesforceRecord extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("salesforce-url")
+            .displayName("URL")
+            .description(
+                    "The URL for the Salesforce REST API including the domain 
without additional path information, such as 
https://MyDomainName.my.salesforce.com";)
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor API_VERSION = new 
PropertyDescriptor.Builder()
+            .name("salesforce-api-version")
+            .displayName("API Version")
+            .description(
+                    "The version number of the Salesforce REST API appended to 
the URL after the services/data path. See Salesforce documentation for 
supported versions")
+            .required(true)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("54.0")
+            .build();
+
+    static final PropertyDescriptor READ_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("read-timeout")
+            .displayName("Read Timeout")
+            .description("Maximum time allowed for reading a response from the 
Salesforce REST API")
+            .required(true)
+            .defaultValue("15 s")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("oauth2-access-token-provider")
+            .displayName("OAuth2 Access Token Provider")
+            .description(
+                    "Service providing OAuth2 Access Tokens for authenticating 
using the HTTP Authorization Header")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(true)
+            .build();
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution 
error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                
context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                
context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");
+        if (objectType == null) {
+            throw new ProcessException("Salesforce object type not found");
+        }
+
+        RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+
+        RecordExtender extender = new RecordExtender();
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, 
in, getLogger());
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             WriteJsonResult writer = getWriter(reader.getSchema(), extender, 
out)) {
+
+            AtomicInteger count = new AtomicInteger();
+            RecordSchema originalSchema = reader.getSchema();
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                count.incrementAndGet();
+                if (!writer.isActiveRecordSet()) {
+                    writer.beginRecordSet();
+                }
+
+                MapRecord extendedRecord = 
extender.getExtendedRecord(objectType, originalSchema, count.get(), record);
+                writer.write(extendedRecord);
+
+                if (count.compareAndSet(maxRecordCount, 0)) {

Review Comment:
   `count` is a local variable, cannot be shared among threads. Is there 
another reason why we need it as an `AtomicInteger`?



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static 
org.apache.nifi.processors.salesforce.util.RecordExtender.ATTRIBUTES_RECORD_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestRecordExtender {
+
+    private static RecordExtender EXTENDER;

Review Comment:
   Suggestion
   Usually the test subject should not be a constant.
   
   Also I like to call it `testSubject` because it's much easier to identify it 
that way in the tests.
   In general it's a good idea to give name _not_ based on the behaviour of the 
object (i.e. type) but based on its _purpose_ (in this case its primary purpose 
is to be tested).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to