exceptionfactory commented on a change in pull request #5802:
URL: https://github.com/apache/nifi/pull/5802#discussion_r815930762



##########
File path: 
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.function.Supplier;
+
+public class SalesforceRestService {
+    private final String version;
+    private final String baseUrl;
+    private final Supplier<String> accessTokenProvider;
+
+    private final OkHttpClient httpClient = new OkHttpClient.Builder().build();

Review comment:
       This is useful as a default configuration, but it would be helpful to 
make some of the timeout properties configurable.
   
   This default configuration also relies on the system default trust store.
   
   The [Certificates and 
Keys](https://help.salesforce.com/s/articleView?id=sf.security_keys_about.htm&type=5)
 section of the Salesforce documentation describes the ability to support 
mutual TLS authentication.  This could be supported through the use of a 
`RestrictedSSLContextService` in the processor, and passing the 
`SSLSocketFactory` and `X509TrustManager` to the `OkHttpClient.Builder`.

##########
File path: 
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySObject.java
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import 
org.apache.nifi.processors.salesforce.util.SalesforceToNifiSchemaConverter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.DateTimeUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"salesforce", "sobject", "soql", "query", "select"})
+@CapabilityDescription("Retrieves records from a Salesforce SObject. Users can 
add arbitrary filter conditions by setting the 'Custom WHERE Condition' 
property."
+        + " Supports incremental retrieval: users can define a field in the 
'Age Field' property that will be used to determine when the record was 
created."
+        + " When this property is set the processor will retrieve new records. 
It's also possible to define an initial cutoff value for the age, fitering out 
all older records"
+        + " even for the first run. This processor is intended to be run on 
the Primary Node only."
+        + " FlowFile attribute 'record.count' indicates how many records were 
retrieved and written to the output.")
+@Stateful(scopes = Scope.CLUSTER, description = "When 'Age Field' is set, 
after performing a query the time of execution is stored. Subsequent queries 
will be augmented"
+        + " with an additional condition so that only records that are newer 
than the stored execution time (adjusted with the optional value of 'Age 
Delay') will be retrieved."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer."),
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile.")
+})
+public class QuerySObject extends AbstractProcessor {
+    static final PropertyDescriptor SOBJECT_NAME = new 
PropertyDescriptor.Builder()
+            .name("sobject-name")
+            .displayName("SObject Name")
+            .description("The name of the sobject to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELD_NAMES = new 
PropertyDescriptor.Builder()
+            .name("field-names")
+            .displayName("Field Names")
+            .description("A coma-separated list of field names to be used in 
the query.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CUSTOM_WHERE_CONDITION = new 
PropertyDescriptor.Builder()
+            .name("custom-where-condition")
+            .displayName("Custom WHERE Condition")
+            .description("A custom expression to be added in the WHERE clause 
of the query.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AGE_FIELD = new 
PropertyDescriptor.Builder()
+            .name("age-field")
+            .displayName("Age Field")
+            .description("The name of a TIMESTAMP field that will be used to 
limit all and filter already retrieved records."
+                    + " Only records that are older than the previous run time 
of this processor will be retrieved."
+            )
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor INITIAL_AGE_FILTER = new 
PropertyDescriptor.Builder()
+            .name("initial-age-filter")
+            .displayName("Initial Age Filter")
+            .description("When 'Age Field' is set the value of this property 
will serve as a filter when this processor runs the first time."
+                    + " Only records that are older than this value be 
retrieved."
+            )
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AGE_DELAY = new 
PropertyDescriptor.Builder()
+            .name("age-delay")
+            .displayName("Age Delay")
+            .description("When 'Age Field' is set the age-based filter will be 
adjusted by this amount."
+                    + " Only records that are older than the previous run time 
of this processor, by at least this amount, will be retrieved."
+            )
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("5 s")
+            .build();
+
+    static final PropertyDescriptor BASE_URL = new PropertyDescriptor.Builder()
+            .name("salesforce-base-url")
+            .displayName("Base URL")
+            .description("")
+            .required(true)
+            .addValidator(Validator.VALID)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor API_VERSION = new 
PropertyDescriptor.Builder()
+            .name("salesforce-api-version")
+            .displayName("API Version")
+            .description("")

Review comment:
       A description should be added.

##########
File path: 
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySObject.java
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import 
org.apache.nifi.processors.salesforce.util.SalesforceToNifiSchemaConverter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.DateTimeUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"salesforce", "sobject", "soql", "query", "select"})
+@CapabilityDescription("Retrieves records from a Salesforce SObject. Users can 
add arbitrary filter conditions by setting the 'Custom WHERE Condition' 
property."
+        + " Supports incremental retrieval: users can define a field in the 
'Age Field' property that will be used to determine when the record was 
created."
+        + " When this property is set the processor will retrieve new records. 
It's also possible to define an initial cutoff value for the age, fitering out 
all older records"
+        + " even for the first run. This processor is intended to be run on 
the Primary Node only."
+        + " FlowFile attribute 'record.count' indicates how many records were 
retrieved and written to the output.")
+@Stateful(scopes = Scope.CLUSTER, description = "When 'Age Field' is set, 
after performing a query the time of execution is stored. Subsequent queries 
will be augmented"
+        + " with an additional condition so that only records that are newer 
than the stored execution time (adjusted with the optional value of 'Age 
Delay') will be retrieved."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer."),
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile.")
+})
+public class QuerySObject extends AbstractProcessor {
+    static final PropertyDescriptor SOBJECT_NAME = new 
PropertyDescriptor.Builder()
+            .name("sobject-name")
+            .displayName("SObject Name")
+            .description("The name of the sobject to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELD_NAMES = new 
PropertyDescriptor.Builder()
+            .name("field-names")
+            .displayName("Field Names")
+            .description("A coma-separated list of field names to be used in 
the query.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CUSTOM_WHERE_CONDITION = new 
PropertyDescriptor.Builder()
+            .name("custom-where-condition")
+            .displayName("Custom WHERE Condition")
+            .description("A custom expression to be added in the WHERE clause 
of the query.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AGE_FIELD = new 
PropertyDescriptor.Builder()
+            .name("age-field")
+            .displayName("Age Field")
+            .description("The name of a TIMESTAMP field that will be used to 
limit all and filter already retrieved records."
+                    + " Only records that are older than the previous run time 
of this processor will be retrieved."
+            )
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor INITIAL_AGE_FILTER = new 
PropertyDescriptor.Builder()
+            .name("initial-age-filter")
+            .displayName("Initial Age Filter")
+            .description("When 'Age Field' is set the value of this property 
will serve as a filter when this processor runs the first time."
+                    + " Only records that are older than this value be 
retrieved."
+            )
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AGE_DELAY = new 
PropertyDescriptor.Builder()
+            .name("age-delay")
+            .displayName("Age Delay")
+            .description("When 'Age Field' is set the age-based filter will be 
adjusted by this amount."
+                    + " Only records that are older than the previous run time 
of this processor, by at least this amount, will be retrieved."
+            )
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("5 s")
+            .build();
+
+    static final PropertyDescriptor BASE_URL = new PropertyDescriptor.Builder()
+            .name("salesforce-base-url")
+            .displayName("Base URL")
+            .description("")

Review comment:
       A description should be added, or this builder method should be removed.

##########
File path: 
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToNifiSchemaConverter.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
+import org.apache.camel.component.salesforce.api.dto.SObjectField;
+import org.apache.camel.component.salesforce.api.utils.JsonUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SalesforceToNifiSchemaConverter {
+
+    private final String dateFormat;
+    private final String dateTimeFormat;
+    private final String timeFormat;
+
+    public SalesforceToNifiSchemaConverter(String dateFormat, String 
dateTimeFormat, String timeFormat) {
+        this.dateFormat = dateFormat;
+        this.dateTimeFormat = dateTimeFormat;
+        this.timeFormat = timeFormat;
+    }
+
+    public RecordSchema convertSchema(final String 
describeSOjbectResultJsonString, final String fieldNamesOfInterest) throws 
IOException {
+
+        final ObjectMapper objectMapper = JsonUtils.createObjectMapper();

Review comment:
       Instances of `ObjectMapper` are thread-safe, so this could be declared 
as a member variable.
   
   The list of dependencies also includes the Java JSON API, is there a 
particular reason for using that API as well as Jackson in this module?

##########
File path: 
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySObject.java
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import 
org.apache.nifi.processors.salesforce.util.SalesforceToNifiSchemaConverter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.DateTimeUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"salesforce", "sobject", "soql", "query", "select"})
+@CapabilityDescription("Retrieves records from a Salesforce SObject. Users can 
add arbitrary filter conditions by setting the 'Custom WHERE Condition' 
property."
+        + " Supports incremental retrieval: users can define a field in the 
'Age Field' property that will be used to determine when the record was 
created."
+        + " When this property is set the processor will retrieve new records. 
It's also possible to define an initial cutoff value for the age, fitering out 
all older records"
+        + " even for the first run. This processor is intended to be run on 
the Primary Node only."
+        + " FlowFile attribute 'record.count' indicates how many records were 
retrieved and written to the output.")
+@Stateful(scopes = Scope.CLUSTER, description = "When 'Age Field' is set, 
after performing a query the time of execution is stored. Subsequent queries 
will be augmented"
+        + " with an additional condition so that only records that are newer 
than the stored execution time (adjusted with the optional value of 'Age 
Delay') will be retrieved."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer."),
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile.")
+})
+public class QuerySObject extends AbstractProcessor {
+    static final PropertyDescriptor SOBJECT_NAME = new 
PropertyDescriptor.Builder()
+            .name("sobject-name")
+            .displayName("SObject Name")
+            .description("The name of the sobject to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELD_NAMES = new 
PropertyDescriptor.Builder()
+            .name("field-names")
+            .displayName("Field Names")
+            .description("A coma-separated list of field names to be used in 
the query.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CUSTOM_WHERE_CONDITION = new 
PropertyDescriptor.Builder()
+            .name("custom-where-condition")
+            .displayName("Custom WHERE Condition")
+            .description("A custom expression to be added in the WHERE clause 
of the query.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AGE_FIELD = new 
PropertyDescriptor.Builder()
+            .name("age-field")
+            .displayName("Age Field")
+            .description("The name of a TIMESTAMP field that will be used to 
limit all and filter already retrieved records."
+                    + " Only records that are older than the previous run time 
of this processor will be retrieved."
+            )
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor INITIAL_AGE_FILTER = new 
PropertyDescriptor.Builder()
+            .name("initial-age-filter")
+            .displayName("Initial Age Filter")
+            .description("When 'Age Field' is set the value of this property 
will serve as a filter when this processor runs the first time."
+                    + " Only records that are older than this value be 
retrieved."
+            )
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AGE_DELAY = new 
PropertyDescriptor.Builder()
+            .name("age-delay")
+            .displayName("Age Delay")
+            .description("When 'Age Field' is set the age-based filter will be 
adjusted by this amount."
+                    + " Only records that are older than the previous run time 
of this processor, by at least this amount, will be retrieved."
+            )
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("5 s")
+            .build();
+
+    static final PropertyDescriptor BASE_URL = new PropertyDescriptor.Builder()
+            .name("salesforce-base-url")
+            .displayName("Base URL")
+            .description("")
+            .required(true)
+            .addValidator(Validator.VALID)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor API_VERSION = new 
PropertyDescriptor.Builder()
+            .name("salesforce-api-version")
+            .displayName("API Version")
+            .description("")
+            .required(true)
+            .addValidator(Validator.VALID)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("46.0")
+            .build();
+
+    static final PropertyDescriptor AUTH_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("auth-service")
+            .displayName("Auth service")

Review comment:
       This property name could be expanded for greater clarity. Adding a 
description would also be helpful.
   ```suggestion
               .displayName("OAuth2 Access Token Provider")
   ```

##########
File path: 
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/pom.xml
##########
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+--><project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>nifi-salesforce-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.16.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-salesforce-processors</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>3.13.1</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-oauth2-provider-api</artifactId>
+            <version>1.16.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>1.16.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>1.16.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>javax.json</groupId>
+            <artifactId>javax.json-api</artifactId>
+            <version>1.0</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.json</artifactId>
+            <version>1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>3.6.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.16.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.16.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-salesforce</artifactId>
+            <version>3.12.0</version>

Review comment:
       It looks like this version still could be upgraded.

##########
File path: 
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToNifiSchemaConverter.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
+import org.apache.camel.component.salesforce.api.dto.SObjectField;
+import org.apache.camel.component.salesforce.api.utils.JsonUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SalesforceToNifiSchemaConverter {

Review comment:
       Minor naming suggestion: what do you think about renaming this to 
`SalesforceToRecordSchemaConverter`? That would more closely match the 
functionality.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to