exceptionfactory commented on code in PR #6301:
URL: https://github.com/apache/nifi/pull/6301#discussion_r950160505


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially

Review Comment:
   Is this annotation necessary? It seems safe to have concurrent invocations 
in some scenarios, unless HubSpot only allows one concurrent request per client.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-nar/pom.xml:
##########
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>nifi-hubspot-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-hubspot-nar</artifactId>
+
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hubspot-processors</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services-nar</artifactId>

Review Comment:
   This dependency should be changed to the `nifi-standard-services-api-nar`.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("admin-api-access-token")

Review Comment:
   This should be changed to `access-token`:
   ```suggestion
               .name("access-token")
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/CrmEndpoint.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.DescribedValue;
+
+public enum CrmEndpoint implements DescribedValue {
+
+    COMPANIES(
+            "/crm/v3/objects/companies",
+            "Companies",
+            "In HubSpot, the companies object is a standard CRM object. 
Individual company records can be used to store information about businesses" +
+                    " and organizations within company properties."
+    ),
+    CONTACTS(
+            "/crm/v3/objects/contacts",
+            "Contacts",
+            "In HubSpot, contacts store information about individuals. From 
marketing automation to smart content, the lead-specific data found in" +
+                    " contact records helps users leverage much of HubSpot's 
functionality."
+    ),
+    DEALS(
+            "/crm/v3/objects/deals",
+            "Deals",
+            "In HubSpot, a deal represents an ongoing transaction that a sales 
team is pursuing with a contact or company. It’s tracked through" +
+                    " pipeline stages until won or lost."
+    ),
+    FEEDBACK_SUBMISSIONS(
+            "/crm/v3/objects/feedback_submissions",
+            "Feedback Submissions",
+            "In HubSpot, feedback submissions are an object which stores 
information submitted to a feedback survey. This includes Net Promoter Score 
(NPS)," +
+                    " Customer Satisfaction (CSAT), Customer Effort Score 
(CES) and Custom Surveys."
+    ),
+    LINE_ITEMS(
+            "/crm/v3/objects/line_items",
+            "Line Items",
+            "In HubSpot, line items can be thought of as a subset of products. 
When a product is attached to a deal, it becomes a line item. Line items can" +
+                    " be created that are unique to an individual quote, but 
they will not be added to the product library."
+    ),
+    PRODUCTS(
+            "/crm/v3/objects/products",
+            "Products",
+            "In HubSpot, products represent the goods or services to be sold. 
Building a product library allows the user to quickly add products to deals," +
+                    " generate quotes, and report on product performance."
+    ),
+    TICKETS(
+            "/crm/v3/objects/tickets",
+            "Tickets",
+            "In HubSpot, a ticket represents a customer request for help or 
support."
+    ),
+    QUOTES(
+            "/crm/v3/objects/quotes",
+            "Quotes",
+            "In HubSpot, quotes are used to share pricing information with 
potential buyers."
+    ),
+
+    CALLS(
+            "/crm/v3/objects/calls",
+            "Calls",
+            "Get calls on CRM records and on the calls index page."
+    ),
+    EMAILS(
+            "/crm/v3/objects/emails",
+            "Emails",
+            "Get emails on CRM records."
+    ),
+    MEETINGS(
+            "/crm/v3/objects/meetings",
+            "Meetings",
+            "Get meetings on CRM records."
+    ),
+    NOTES(
+            "/crm/v3/objects/notes",
+            "Notes",
+            "Get notes on CRM records."
+    ),
+    TASKS(
+            "/crm/v3/objects/tasks",
+            "Tasks",
+            "Get tasks on CRM records."
+    ),
+
+    OWNERS(
+            "/crm/v3/owners/",
+            "Owners",
+            "HubSpot uses owners to assign specific users to contacts, 
companies, deals, tickets, or engagements. Any HubSpot user with access to 
contacts" +
+                    " can be assigned as an owner, and multiple owners can be 
assigned to an object by creating a custom property for this purpose."
+    );
+
+
+    private final String value;
+    private final String displayName;
+    private final String description;
+
+    CrmEndpoint(final String value, final String displayName, final String 
description) {
+        this.value = value;
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    public AllowableValue getAllowableValue() {
+        return new AllowableValue(value, displayName, description);
+    }

Review Comment:
   This method seems unnecessary, for references in test code, it should be 
sufficient to use `getValue()`.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("admin-api-access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to get")
+            .required(true)
+            .allowableValues(CrmEndpoint.class)
+            .build();
+
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+            .name("crm-limit")
+            .displayName("Limit")
+            .description("The maximum number of results to display per page")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Controller service for HTTP client operations")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
HTTP request.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("In case of HTTP client errors the flowfile will be 
routed to this relationship")
+            .build();
+
+    private static final String API_BASE_URI = "api.hubapi.com";
+    private static final String HTTPS = "https";
+    private static final String CURSOR_PARAMETER = "after";
+    private static final String LIMIT_PARAMETER = "limit";
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+
+    private volatile WebClientServiceProvider webClientServiceProvider;
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            ACCESS_TOKEN,
+            CRM_ENDPOINT,
+            LIMIT,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final String accessToken = 
context.getProperty(ACCESS_TOKEN).getValue();
+        final String endpoint = context.getProperty(CRM_ENDPOINT).getValue();
+
+        final StateMap state = getStateMap(context);
+        final URI uri = createUri(context, state);
+
+        final HttpResponseEntity response = getHttpResponseEntity(accessToken, 
uri);
+        final AtomicInteger objectCountHolder = new AtomicInteger();
+
+        FlowFile flowFile = session.create();
+        flowFile = session.putAttribute(flowFile, "statusCode", 
String.valueOf(response.statusCode()));
+
+        flowFile = session.write(flowFile, out -> {

Review Comment:
   Based on other comments, removing the failure relationship would allow 
FlowFile creation to be moved inside of the conditional check for a successful 
response status code.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("admin-api-access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to get")
+            .required(true)
+            .allowableValues(CrmEndpoint.class)
+            .build();
+
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+            .name("crm-limit")
+            .displayName("Limit")
+            .description("The maximum number of results to display per page")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Controller service for HTTP client operations")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
HTTP request.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("In case of HTTP client errors the flowfile will be 
routed to this relationship")
+            .build();

Review Comment:
   As a source Processor with input forbidden, having a failure relationship 
does not seem to follow the model of other processors. Recommend removing this 
relationship given the general design.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("admin-api-access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to get")
+            .required(true)
+            .allowableValues(CrmEndpoint.class)
+            .build();
+
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+            .name("crm-limit")
+            .displayName("Limit")
+            .description("The maximum number of results to display per page")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Controller service for HTTP client operations")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
HTTP request.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("In case of HTTP client errors the flowfile will be 
routed to this relationship")
+            .build();
+
+    private static final String API_BASE_URI = "api.hubapi.com";
+    private static final String HTTPS = "https";
+    private static final String CURSOR_PARAMETER = "after";
+    private static final String LIMIT_PARAMETER = "limit";
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+
+    private volatile WebClientServiceProvider webClientServiceProvider;
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            ACCESS_TOKEN,
+            CRM_ENDPOINT,
+            LIMIT,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final String accessToken = 
context.getProperty(ACCESS_TOKEN).getValue();
+        final String endpoint = context.getProperty(CRM_ENDPOINT).getValue();
+
+        final StateMap state = getStateMap(context);
+        final URI uri = createUri(context, state);
+
+        final HttpResponseEntity response = getHttpResponseEntity(accessToken, 
uri);
+        final AtomicInteger objectCountHolder = new AtomicInteger();
+
+        FlowFile flowFile = session.create();
+        flowFile = session.putAttribute(flowFile, "statusCode", 
String.valueOf(response.statusCode()));
+
+        flowFile = session.write(flowFile, out -> {
+
+            try (JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
+                 final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName().equals("results")) {
+                        jsonParser.nextToken();
+                        jsonGenerator.copyCurrentStructure(jsonParser);
+                        objectCountHolder.incrementAndGet();
+                    }
+                    String fieldName = jsonParser.getCurrentName();
+                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                        jsonParser.nextToken();
+                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
+                        newStateMap.put(endpoint, jsonParser.getText());
+                        updateState(context, newStateMap);
+                        break;
+                    }
+                }
+            }
+        });
+        if (response.statusCode() >= 400) {
+            if (response.statusCode() == 429) {
+                context.yield();
+                throw new ProcessException("Rate limit exceeded, yielding for 
10 seconds before retrying request.");
+            } else {
+                getLogger().warn("HTTP [{}] client error occurred at endpoint 
[{}]", response.statusCode(), endpoint);
+                session.transfer(flowFile, REL_FAILURE);

Review Comment:
   As mentioned on the relationship, changing the implementation to only create 
a FlowFile on success follows the pattern of other "source" Processors, so this 
could be removed.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("admin-api-access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to get")
+            .required(true)
+            .allowableValues(CrmEndpoint.class)
+            .build();
+
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+            .name("crm-limit")
+            .displayName("Limit")
+            .description("The maximum number of results to display per page")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Controller service for HTTP client operations")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
HTTP request.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("In case of HTTP client errors the flowfile will be 
routed to this relationship")
+            .build();
+
+    private static final String API_BASE_URI = "api.hubapi.com";
+    private static final String HTTPS = "https";
+    private static final String CURSOR_PARAMETER = "after";
+    private static final String LIMIT_PARAMETER = "limit";
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+
+    private volatile WebClientServiceProvider webClientServiceProvider;
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            ACCESS_TOKEN,
+            CRM_ENDPOINT,
+            LIMIT,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final String accessToken = 
context.getProperty(ACCESS_TOKEN).getValue();
+        final String endpoint = context.getProperty(CRM_ENDPOINT).getValue();
+
+        final StateMap state = getStateMap(context);
+        final URI uri = createUri(context, state);
+
+        final HttpResponseEntity response = getHttpResponseEntity(accessToken, 
uri);
+        final AtomicInteger objectCountHolder = new AtomicInteger();
+
+        FlowFile flowFile = session.create();
+        flowFile = session.putAttribute(flowFile, "statusCode", 
String.valueOf(response.statusCode()));
+
+        flowFile = session.write(flowFile, out -> {
+
+            try (JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
+                 final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName().equals("results")) {
+                        jsonParser.nextToken();
+                        jsonGenerator.copyCurrentStructure(jsonParser);
+                        objectCountHolder.incrementAndGet();
+                    }
+                    String fieldName = jsonParser.getCurrentName();
+                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                        jsonParser.nextToken();
+                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
+                        newStateMap.put(endpoint, jsonParser.getText());
+                        updateState(context, newStateMap);
+                        break;
+                    }
+                }
+            }
+        });
+        if (response.statusCode() >= 400) {
+            if (response.statusCode() == 429) {
+                context.yield();
+                throw new ProcessException("Rate limit exceeded, yielding for 
10 seconds before retrying request.");
+            } else {
+                getLogger().warn("HTTP [{}] client error occurred at endpoint 
[{}]", response.statusCode(), endpoint);

Review Comment:
   Recommend a minor adjustment to remove the `[]` characters around the status 
code since it will always be a number, and also adjusting the wording to 
include the complete URI.
   ```suggestion
                   getLogger().warn("HTTP {} error for requested URI [{}]", 
response.statusCode(), uri);
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/docs/org.apache.nifi.processors.hubspot.GetHubSpot/additionalDetails.html:
##########
@@ -0,0 +1,143 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8"/>
+    <title>GetHubSpot</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+
+<body>
+<h2>Incremental Loading</h2>
+<p>
+    Some resources can be processed incrementally by NiFi. This means that 
only resources created or modified after the last run
+    time of the processor are displayed. The processor state can be reset in 
the context menu. The following list shows which
+    date-time fields are incremented for which resources.
+<ul>
+    <li>Access
+        <ul>
+            <li>Access Scope: none</li>
+            <li>StoreFront Access Token: none</li>
+        </ul>
+    </li>
+    <li>Analytics
+        <ul>
+            <li>Reports: updated_at_min</li>
+        </ul>
+    </li>
+    <li>Billing
+        <ul>
+            <li>Application Charge: none</li>
+            <li>Application Credit: none</li>
+            <li>Recurring Application Charge: none</li>
+        </ul>
+    </li>
+    <li>Customers
+        <ul>
+            <li>Customers: updated_at_min</li>
+            <li>Customer Saved Searches: none</li>
+        </ul>
+    </li>
+    <li>Discounts
+        <ul>
+            <li>Price Rules: updated_at_min</li>
+        </ul>
+    </li>
+    <li>Events
+        <ul>
+            <li>Events: created_at_min</li>
+        </ul>
+    </li>
+    <li>Inventory
+        <ul>
+            <li>Inventory Levels: updated_at_min</li>
+            <li>Locations: none</li>
+        </ul>
+    </li>
+    <li>Marketing Event
+        <ul>
+            <li>Marketing Events: none</li>
+        </ul>
+    </li>
+    <li>Metafields
+        <ul>
+            <li>Metafields: updated_at_min</li>
+        </ul>
+    </li>
+    <li>Online Store
+        <ul>
+            <li>Blogs: none</li>
+            <li>Comment: none</li>
+            <li>Pages: none</li>
+            <li>Redirects: none</li>
+            <li>Script Tags: updated_at_min</li>
+            <li>Themes: none</li>
+        </ul>
+    </li>
+    <li>Orders
+        <ul>
+            <li>Abandoned Checkouts: updated_at_min</li>
+            <li>Draft Orders: updated_at_min</li>
+            <li>Orders: updated_at_min</li>
+        </ul>
+    </li>
+    <li>Plus
+        <ul>
+            <li>Gift Cards: none</li>
+            <li>Users: none</li>
+        </ul>
+    </li>
+    <li>Product
+        <ul>
+            <li>Collects: none</li>
+            <li>Custom Collections: updated_at_min</li>
+            <li>Products: updated_at_min</li>
+            <li>Smart Collections: updated_at_min</li>
+        </ul>
+    </li>
+    <li>Sales Channels
+        <ul>
+            <li>Collection Listings: none</li>
+            <li>Mobile Platform Applications: none</li>
+            <li>Product Listings: updated_at_min</li>
+            <li>Resource Feedbacks: none</li>
+        </ul>
+    </li>
+    <li>Shipping and Fulfillments
+        <ul>
+            <li>Carrier Services: none</li>
+        </ul>
+    </li>
+    <li>Store Properties
+        <ul>
+            <li>Countries: none</li>
+            <li>Currencies: none</li>
+            <li>Policies: none</li>
+            <li>Shipping Zones: updated_at_min</li>
+            <li>Shop: none</li>
+        </ul>
+    </li>
+    <li>Tender Transactions
+        <ul>
+            <li>Tender Transactions: processed_at_min</li>
+        </ul>
+    </li>
+</ul>
+
+Specific trap type can set in case of Enterprise Specific generic trap type is 
chosen.

Review Comment:
   Does this comment apply to this processor, or should it be removed?



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("admin-api-access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to get")
+            .required(true)
+            .allowableValues(CrmEndpoint.class)
+            .build();
+
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+            .name("crm-limit")
+            .displayName("Limit")
+            .description("The maximum number of results to display per page")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Controller service for HTTP client operations")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
HTTP request.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("In case of HTTP client errors the flowfile will be 
routed to this relationship")
+            .build();
+
+    private static final String API_BASE_URI = "api.hubapi.com";
+    private static final String HTTPS = "https";
+    private static final String CURSOR_PARAMETER = "after";
+    private static final String LIMIT_PARAMETER = "limit";
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+
+    private volatile WebClientServiceProvider webClientServiceProvider;
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            ACCESS_TOKEN,
+            CRM_ENDPOINT,

Review Comment:
   As mentioned, recommend placing the CRM_ENDPOINT first in the list of 
properties.
   ```suggestion
               CRM_ENDPOINT,
               ACCESS_TOKEN,
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("admin-api-access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()

Review Comment:
   Recommend moving this property before `ACCESS_TOKEN`.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("admin-api-access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to get")

Review Comment:
   ```suggestion
               .description("The HubSpot CRM API endpoint to which the 
Processor will send requests")
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("admin-api-access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to get")
+            .required(true)
+            .allowableValues(CrmEndpoint.class)
+            .build();
+
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+            .name("crm-limit")
+            .displayName("Limit")
+            .description("The maximum number of results to display per page")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Controller service for HTTP client operations")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
HTTP request.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("In case of HTTP client errors the flowfile will be 
routed to this relationship")
+            .build();
+
+    private static final String API_BASE_URI = "api.hubapi.com";
+    private static final String HTTPS = "https";
+    private static final String CURSOR_PARAMETER = "after";
+    private static final String LIMIT_PARAMETER = "limit";
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+
+    private volatile WebClientServiceProvider webClientServiceProvider;
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            ACCESS_TOKEN,
+            CRM_ENDPOINT,
+            LIMIT,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final String accessToken = 
context.getProperty(ACCESS_TOKEN).getValue();
+        final String endpoint = context.getProperty(CRM_ENDPOINT).getValue();
+
+        final StateMap state = getStateMap(context);
+        final URI uri = createUri(context, state);
+
+        final HttpResponseEntity response = getHttpResponseEntity(accessToken, 
uri);
+        final AtomicInteger objectCountHolder = new AtomicInteger();
+
+        FlowFile flowFile = session.create();
+        flowFile = session.putAttribute(flowFile, "statusCode", 
String.valueOf(response.statusCode()));
+
+        flowFile = session.write(flowFile, out -> {
+
+            try (JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
+                 final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName().equals("results")) {
+                        jsonParser.nextToken();
+                        jsonGenerator.copyCurrentStructure(jsonParser);
+                        objectCountHolder.incrementAndGet();
+                    }
+                    String fieldName = jsonParser.getCurrentName();
+                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                        jsonParser.nextToken();
+                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
+                        newStateMap.put(endpoint, jsonParser.getText());
+                        updateState(context, newStateMap);
+                        break;
+                    }
+                }
+            }
+        });
+        if (response.statusCode() >= 400) {
+            if (response.statusCode() == 429) {
+                context.yield();
+                throw new ProcessException("Rate limit exceeded, yielding for 
10 seconds before retrying request.");
+            } else {
+                getLogger().warn("HTTP [{}] client error occurred at endpoint 
[{}]", response.statusCode(), endpoint);
+                session.transfer(flowFile, REL_FAILURE);
+            }
+        } else if (objectCountHolder.get() > 0) {
+            session.transfer(flowFile, REL_SUCCESS);
+        } else {
+            getLogger().debug("Empty response when requested HubSpot endpoint: 
[{}]", endpoint);
+        }
+    }
+
+    HttpResponseEntity getHttpResponseEntity(final String accessToken, final 
URI uri) {
+        return webClientServiceProvider.getWebClientService()
+                .get()
+                .uri(uri)
+                .header("Authorization", "Bearer " + accessToken)
+                .retrieve();
+    }
+
+    HttpUriBuilder getBaseUri(final ProcessContext context) {
+        final String path = context.getProperty(CRM_ENDPOINT).getValue();
+        return webClientServiceProvider.getHttpUriBuilder()
+                .scheme(HTTPS)
+                .host(API_BASE_URI)
+                .encodedPath(path);
+    }
+
+    URI createUri(final ProcessContext context, final StateMap state) {

Review Comment:
   It looks like these methods should be marked `private`.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("admin-api-access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to get")
+            .required(true)
+            .allowableValues(CrmEndpoint.class)
+            .build();
+
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+            .name("crm-limit")
+            .displayName("Limit")
+            .description("The maximum number of results to display per page")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Controller service for HTTP client operations")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
HTTP request.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("In case of HTTP client errors the flowfile will be 
routed to this relationship")
+            .build();
+
+    private static final String API_BASE_URI = "api.hubapi.com";
+    private static final String HTTPS = "https";
+    private static final String CURSOR_PARAMETER = "after";
+    private static final String LIMIT_PARAMETER = "limit";
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+
+    private volatile WebClientServiceProvider webClientServiceProvider;
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            ACCESS_TOKEN,
+            CRM_ENDPOINT,
+            LIMIT,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final String accessToken = 
context.getProperty(ACCESS_TOKEN).getValue();
+        final String endpoint = context.getProperty(CRM_ENDPOINT).getValue();
+
+        final StateMap state = getStateMap(context);
+        final URI uri = createUri(context, state);
+
+        final HttpResponseEntity response = getHttpResponseEntity(accessToken, 
uri);
+        final AtomicInteger objectCountHolder = new AtomicInteger();
+
+        FlowFile flowFile = session.create();
+        flowFile = session.putAttribute(flowFile, "statusCode", 
String.valueOf(response.statusCode()));
+
+        flowFile = session.write(flowFile, out -> {
+
+            try (JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
+                 final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName().equals("results")) {
+                        jsonParser.nextToken();
+                        jsonGenerator.copyCurrentStructure(jsonParser);
+                        objectCountHolder.incrementAndGet();
+                    }
+                    String fieldName = jsonParser.getCurrentName();
+                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                        jsonParser.nextToken();
+                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
+                        newStateMap.put(endpoint, jsonParser.getText());
+                        updateState(context, newStateMap);
+                        break;
+                    }
+                }
+            }
+        });
+        if (response.statusCode() >= 400) {
+            if (response.statusCode() == 429) {
+                context.yield();
+                throw new ProcessException("Rate limit exceeded, yielding for 
10 seconds before retrying request.");

Review Comment:
   Although `10 seconds` is the default configuration, this can be changed. 
Instead of throwing an exception, it should be sufficient to log a warning. 
Also recommend including the status code and URI in a warning.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hubspot"})
+@CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
+        + " Supports incremental retrieval: Users can set the \"limit\" 
property which serves as the upper limit of the retrieved objects."
+        + " When this property is set the processor will retrieve new records. 
This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
+        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@DefaultSettings(yieldDuration = "10 sec")
+public class GetHubSpot extends AbstractProcessor {
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("admin-api-access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor CRM_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("crm-endpoint")
+            .displayName("HubSpot CRM API Endpoint")
+            .description("The HubSpot CRM API endpoint to get")
+            .required(true)
+            .allowableValues(CrmEndpoint.class)
+            .build();
+
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+            .name("crm-limit")
+            .displayName("Limit")
+            .description("The maximum number of results to display per page")

Review Comment:
   The `display per page` wording sounds more like a user interface, so 
recommend something along the following lines:
   ```suggestion
               .description("The maximum number of results to request for each 
invocation of the Processor")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to