tpalfy commented on code in PR #6670:
URL: https://github.com/apache/nifi/pull/6670#discussion_r1026780290


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class RecordExtender {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    static final SimpleRecordSchema ATTRIBUTES_RECORD_SCHEMA = new 
SimpleRecordSchema(Arrays.asList(
+            new RecordField("type", RecordFieldType.STRING.getDataType()),
+            new RecordField("referenceId", 
RecordFieldType.STRING.getDataType())
+    ));
+
+    public ObjectNode getWrappedRecordsJson(ByteArrayOutputStream out) throws 
IOException {
+        ObjectNode root = MAPPER.createObjectNode();
+        JsonNode jsonNode = MAPPER.readTree(out.toByteArray());
+        root.set("records", jsonNode);
+        return root;
+    }
+
+    public MapRecord getExtendedRecord(String objectType, RecordSchema 
originalSchema, int count, Record record) {
+        SimpleRecordSchema extendedSchema = getExtendedSchema(originalSchema);
+
+        Set<String> rawFieldNames = record.getRawFieldNames();
+        Map<String, Object> objectMap = rawFieldNames.stream()
+                .collect(Collectors.toMap(Function.identity(), 
record::getValue));
+
+        Map<String, Object> attributesMap = new HashMap<>();
+        attributesMap.put("type", objectType);
+        attributesMap.put("referenceId", count);
+
+        MapRecord attributesRecord = new MapRecord(ATTRIBUTES_RECORD_SCHEMA, 
attributesMap);
+
+        objectMap.put("attributes", attributesRecord);
+
+        return new MapRecord(extendedSchema, objectMap);
+    }
+
+    public SimpleRecordSchema getExtendedSchema(RecordSchema original) {

Review Comment:
   This would be better done once, in the constructor for example.
   
   A challenge when it needs to be instantiated in a try-with-resources block 
without implementing AutoClosable but there's a trick we can use:
   ```java
           RecordExtender extender;
           
           try (InputStream in = session.read(flowFile);
                RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger());
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                WriteJsonResult writer = getWriter(extender = new 
RecordExtender(reader.getSchema()), out)) {
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java:
##########
@@ -84,11 +86,28 @@ public InputStream getNextRecords(String nextRecordsUrl) {
         return request(request);
     }
 
+    public InputStream postRecord(String sObjectApiName, String body) {
+        String url = baseUrl + "/services/data/v" + version + 
"/composite/tree/" + sObjectApiName;
+
+        HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
+                .build();
+
+        final RequestBody requestBody = RequestBody.create(body, 
MediaType.parse("application/json"));
+
+        Request request = new Request.Builder()
+                .addHeader("Authorization", "Bearer " + 
accessTokenProvider.get())
+                .url(httpUrl)
+                .post(requestBody)
+                .build();
+
+        return request(request);
+    }
+
     private InputStream request(Request request) {
         Response response = null;
         try {
             response = httpClient.newCall(request).execute();
-            if (response.code() != 200) {
+            if (response.code() < 200 || response.code() >= 400) {

Review Comment:
   I'm not sure about this expansion. I assume the new endpoint returns a 201. 
I think we should allow only that (apart from 200), or maybe 2xx.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceRecord.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Posts records to a Salesforce sObject. The type of the 
Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute.")
+public class PutSalesforceRecord extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("salesforce-url")
+            .displayName("URL")
+            .description(
+                    "The URL for the Salesforce REST API including the domain 
without additional path information, such as 
https://MyDomainName.my.salesforce.com";)
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor API_VERSION = new 
PropertyDescriptor.Builder()
+            .name("salesforce-api-version")
+            .displayName("API Version")
+            .description(
+                    "The version number of the Salesforce REST API appended to 
the URL after the services/data path. See Salesforce documentation for 
supported versions")
+            .required(true)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("54.0")
+            .build();
+
+    static final PropertyDescriptor READ_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("read-timeout")
+            .displayName("Read Timeout")
+            .description("Maximum time allowed for reading a response from the 
Salesforce REST API")
+            .required(true)
+            .defaultValue("15 s")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("oauth2-access-token-provider")
+            .displayName("OAuth2 Access Token Provider")
+            .description(
+                    "Service providing OAuth2 Access Tokens for authenticating 
using the HTTP Authorization Header")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(true)
+            .build();
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution 
error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                
context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                
context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");

Review Comment:
   Please add a `@ReadsAttribute` annotation on the class with a description 
that explains what this is supposed to be.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceRecord.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Posts records to a Salesforce sObject. The type of the 
Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute.")
+public class PutSalesforceRecord extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("salesforce-url")
+            .displayName("URL")
+            .description(
+                    "The URL for the Salesforce REST API including the domain 
without additional path information, such as 
https://MyDomainName.my.salesforce.com";)
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor API_VERSION = new 
PropertyDescriptor.Builder()
+            .name("salesforce-api-version")
+            .displayName("API Version")
+            .description(
+                    "The version number of the Salesforce REST API appended to 
the URL after the services/data path. See Salesforce documentation for 
supported versions")
+            .required(true)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("54.0")
+            .build();
+
+    static final PropertyDescriptor READ_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("read-timeout")
+            .displayName("Read Timeout")
+            .description("Maximum time allowed for reading a response from the 
Salesforce REST API")
+            .required(true)
+            .defaultValue("15 s")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("oauth2-access-token-provider")
+            .displayName("OAuth2 Access Token Provider")
+            .description(
+                    "Service providing OAuth2 Access Tokens for authenticating 
using the HTTP Authorization Header")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(true)
+            .build();
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution 
error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                
context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                
context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");
+        if (objectType == null) {
+            throw new ProcessException("Salesforce object type not found");

Review Comment:
   For easier error handling the message should state that the object type was 
not found _among the incoming flowfile attributes_ - as such it's okay to refer 
to the actual attribute name.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceRecord.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Posts records to a Salesforce sObject. The type of the 
Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute.")
+public class PutSalesforceRecord extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("salesforce-url")
+            .displayName("URL")
+            .description(
+                    "The URL for the Salesforce REST API including the domain 
without additional path information, such as 
https://MyDomainName.my.salesforce.com";)
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor API_VERSION = new 
PropertyDescriptor.Builder()
+            .name("salesforce-api-version")
+            .displayName("API Version")
+            .description(
+                    "The version number of the Salesforce REST API appended to 
the URL after the services/data path. See Salesforce documentation for 
supported versions")
+            .required(true)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("54.0")
+            .build();
+
+    static final PropertyDescriptor READ_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("read-timeout")
+            .displayName("Read Timeout")
+            .description("Maximum time allowed for reading a response from the 
Salesforce REST API")
+            .required(true)
+            .defaultValue("15 s")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("oauth2-access-token-provider")
+            .displayName("OAuth2 Access Token Provider")
+            .description(
+                    "Service providing OAuth2 Access Tokens for authenticating 
using the HTTP Authorization Header")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(true)
+            .build();
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution 
error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                
context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                
context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");
+        if (objectType == null) {
+            throw new ProcessException("Salesforce object type not found");
+        }
+
+        RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+
+        RecordExtender extender = new RecordExtender();
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, 
in, getLogger());
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             WriteJsonResult writer = getWriter(reader.getSchema(), extender, 
out)) {
+
+            AtomicInteger count = new AtomicInteger();
+            RecordSchema originalSchema = reader.getSchema();
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                count.incrementAndGet();
+                if (!writer.isActiveRecordSet()) {
+                    writer.beginRecordSet();
+                }
+
+                MapRecord extendedRecord = 
extender.getExtendedRecord(objectType, originalSchema, count.get(), record);
+                writer.write(extendedRecord);
+
+                if (count.compareAndSet(maxRecordCount, 0)) {
+                    processRecords(objectType, out, writer, extender);
+                    out.reset();
+                }
+            }
+
+            if (writer.isActiveRecordSet()) {
+                processRecords(objectType, out, writer, extender);
+            }
+
+        } catch (Exception ex) {

Review Comment:
   For error handling purposes we it would be better to distinguish between 
IOException, MalformedRecordException and SchemaNotFoundException.
   
   In all 3 cases the flowfile can go to REL_FAILURE but the message could be 
different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to