exceptionfactory commented on code in PR #6301:
URL: https://github.com/apache/nifi/pull/6301#discussion_r956421187


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpResponseStatus;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to which the Processor 
will send requests")
+            .required(true)
+            .allowableValues(CrmEndpoint.class)
+            .build();
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+            .name("crm-limit")
+            .displayName("Limit")
+            .description("The maximum number of results to request for each 
invocation of the Processor")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)

Review Comment:
   The check for `isLimitSet` in the `createUri` method implies this property 
is optional, so it looks like this should be changed.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpResponseStatus;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to which the Processor 
will send requests")

Review Comment:
   Recommend renaming this property to `CRM Object Type` to align with the 
display name presented when selecting the available property. Although the 
internal value is a path, the display name is the object type.
   ```suggestion
       static final PropertyDescriptor CRM_OBJECT_TYPE = new 
PropertyDescriptor.Builder()
               .name("crm-object-type")
               .displayName("CRM Object Type")
               .description("HubSpot CRM Object Type requested")
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpResponseStatus;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to which the Processor 
will send requests")
+            .required(true)
+            .allowableValues(CrmEndpoint.class)
+            .build();
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+            .name("crm-limit")
+            .displayName("Limit")
+            .description("The maximum number of results to request for each 
invocation of the Processor")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Controller service for HTTP client operations")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
HTTP request.")
+            .build();
+
+    private static final String API_BASE_URI = "api.hubapi.com";
+    private static final String HTTPS = "https";
+    private static final String CURSOR_PARAMETER = "after";
+    private static final String LIMIT_PARAMETER = "limit";
+    private static final int TOO_MANY_REQUESTS = 429;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+
+    private volatile WebClientServiceProvider webClientServiceProvider;
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            CRM_ENDPOINT,
+            ACCESS_TOKEN,
+            LIMIT,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final String accessToken = 
context.getProperty(ACCESS_TOKEN).getValue();
+        final String endpoint = context.getProperty(CRM_ENDPOINT).getValue();
+
+        final StateMap state = getStateMap(context);
+        final URI uri = createUri(context, state);
+
+        final HttpResponseEntity response = getHttpResponseEntity(accessToken, 
uri);
+        final AtomicInteger objectCountHolder = new AtomicInteger();
+
+        if (response.statusCode() == HttpResponseStatus.OK.getCode()) {
+            FlowFile flowFile = session.create();
+            flowFile = session.write(flowFile, parseHttpResponse(context, 
endpoint, state, response, objectCountHolder));
+            if (objectCountHolder.get() > 0) {
+                session.transfer(flowFile, REL_SUCCESS);
+            } else {
+                getLogger().debug("Empty response when requested HubSpot 
endpoint: [{}]", endpoint);
+                session.remove(flowFile);
+            }
+        } else if (response.statusCode() >= 400) {

Review Comment:
   This should be changed to a more generalized `else` condition.
   ```suggestion
           } else {
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpResponseStatus;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {

Review Comment:
   The current version of this processor is limited to HubSpot 
[CRM](https://developers.hubspot.com/docs/api/crm/understanding-the-crm) 
operations, but there are other parts of the HubSpot API. In light of that 
fact, it seems like the processor should be renamed to `GetHubSpotCRM`, or the 
Object Type property should be renamed to something more general. What do you 
think?



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpResponseStatus;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to which the Processor 
will send requests")
+            .required(true)
+            .allowableValues(CrmEndpoint.class)
+            .build();
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+            .name("crm-limit")
+            .displayName("Limit")

Review Comment:
   Recommend renaming this property to `Result Limit` for better clarity of 
function.
   ```suggestion
       static final PropertyDescriptor RESULT_LIMIT = new 
PropertyDescriptor.Builder()
               .name("result-limit")
               .displayName("Result Limit")
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpResponseStatus;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."

Review Comment:
   Recommend adjusting the wording to avoid the quotes and use capitalization 
for the Result Limit property instead:
   ```suggestion
           + " Configuring the Result Limit property enables incremental 
retrieval of results."
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpResponseStatus;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to which the Processor 
will send requests")
+            .required(true)
+            .allowableValues(CrmEndpoint.class)
+            .build();
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+            .name("crm-limit")
+            .displayName("Limit")
+            .description("The maximum number of results to request for each 
invocation of the Processor")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Controller service for HTTP client operations")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
HTTP request.")
+            .build();
+
+    private static final String API_BASE_URI = "api.hubapi.com";
+    private static final String HTTPS = "https";
+    private static final String CURSOR_PARAMETER = "after";
+    private static final String LIMIT_PARAMETER = "limit";
+    private static final int TOO_MANY_REQUESTS = 429;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+
+    private volatile WebClientServiceProvider webClientServiceProvider;
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            CRM_ENDPOINT,
+            ACCESS_TOKEN,
+            LIMIT,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final String accessToken = 
context.getProperty(ACCESS_TOKEN).getValue();
+        final String endpoint = context.getProperty(CRM_ENDPOINT).getValue();
+
+        final StateMap state = getStateMap(context);
+        final URI uri = createUri(context, state);
+
+        final HttpResponseEntity response = getHttpResponseEntity(accessToken, 
uri);
+        final AtomicInteger objectCountHolder = new AtomicInteger();
+
+        if (response.statusCode() == HttpResponseStatus.OK.getCode()) {
+            FlowFile flowFile = session.create();
+            flowFile = session.write(flowFile, parseHttpResponse(context, 
endpoint, state, response, objectCountHolder));
+            if (objectCountHolder.get() > 0) {
+                session.transfer(flowFile, REL_SUCCESS);
+            } else {
+                getLogger().debug("Empty response when requested HubSpot 
endpoint: [{}]", endpoint);
+                session.remove(flowFile);
+            }
+        } else if (response.statusCode() >= 400) {
+            if (response.statusCode() == TOO_MANY_REQUESTS) {
+                context.yield();
+                throw new ProcessException(String.format("Rate limit exceeded, 
yielding before retrying request. HTTP %d error for requested URI [%s]", 
response.statusCode(), uri));
+            } else {
+                getLogger().warn("HTTP {} error for requested URI [{}]", 
response.statusCode(), uri);

Review Comment:
   It would be helpful to read the HTTP response body to a string and include 
it in the warning. It contains useful troubleshooting details. This would also 
handle consuming and closing the response body InputStream in failure scenarios.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpResponseStatus;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")

Review Comment:
   These two lines can be removed since they are implied as part of cluster 
state storage for all components.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/docs/org.apache.nifi.processors.hubspot.GetHubSpot/additionalDetails.html:
##########
@@ -0,0 +1,143 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8"/>
+    <title>GetHubSpot</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+
+<body>
+<h2>Incremental Loading</h2>

Review Comment:
   Recommend adding a section at the beginning with a link to the HubSpot 
[Authentication Methods](https://developers.hubspot.com/docs/api/intro-to-auth) 
documentation, which describes how to obtain an Access Token.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to