dam4rus commented on code in PR #6301: URL: https://github.com/apache/nifi/pull/6301#discussion_r954889290
########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hubspot; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpResponseStatus; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +@PrimaryNodeOnly +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"hubspot"}) +@CapabilityDescription("Retrieves JSON data from a private HubSpot application." + + " Supports incremental retrieval: Users can set the \"limit\" property which serves as the upper limit of the retrieved objects." + + " When this property is set the processor will retrieve new records. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is set, the paging cursor is saved after executing a request." + + " Only the objects after the paging cursor will be retrieved. The maximum number of retrieved objects is the 'Limit' attribute." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@DefaultSettings(yieldDuration = "10 sec") +public class GetHubSpot extends AbstractProcessor { + + static final PropertyDescriptor CRM_ENDPOINT = new PropertyDescriptor.Builder() + .name("crm-endpoint") + .displayName("HubSpot CRM API Endpoint") + .description("The HubSpot CRM API endpoint to which the Processor will send requests") + .required(true) + .allowableValues(CrmEndpoint.class) + .build(); + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("access-token") + .displayName("Access Token") + .description("Access Token to authenticate requests") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() + .name("crm-limit") + .displayName("Limit") + .description("The maximum number of results to request for each invocation of the Processor") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder() + .name("web-client-service-provider") + .displayName("Web Client Service Provider") + .description("Controller service for HTTP client operations") + .identifiesControllerService(WebClientServiceProvider.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful HTTP request.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() Review Comment: Unused field ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hubspot; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.HttpUrl; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.web.client.StandardHttpUriBuilder; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class GetHubSpotTest { + + public static final String BASE_URL = "/test/hubspot"; + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static MockWebServer server; + private static HttpUrl baseUrl; + + @BeforeEach + void setup() throws IOException { + server = new MockWebServer(); + server.start(); + baseUrl = server.url(BASE_URL); + } + + @AfterEach + void tearDown() throws IOException { + if (server != null) { + server.shutdown(); + server = null; + } + } + + @Test + void testLimitIsAddedToUrl() throws InitializationException, InterruptedException, IOException { + + final String response = getResourceAsString("response-without-paging-cursor.json"); + server.enqueue(new MockResponse().setResponseCode(200).setBody(response)); + + final StandardWebClientServiceProvider standardWebClientServiceProvider = new StandardWebClientServiceProvider(); Review Comment: This code block is duplicated in every test case. You can extract it to a method or set it up in `void setup()` ```java final StandardWebClientServiceProvider standardWebClientServiceProvider = new StandardWebClientServiceProvider(); final MockGetHubSpot mockGetHubSpot = new MockGetHubSpot(); TestRunner runner = TestRunners.newTestRunner(mockGetHubSpot); runner.addControllerService("standardWebClientServiceProvider", standardWebClientServiceProvider); runner.enableControllerService(standardWebClientServiceProvider); runner.setProperty(GetHubSpot.WEB_CLIENT_SERVICE_PROVIDER, standardWebClientServiceProvider.getIdentifier()); runner.setProperty(GetHubSpot.ACCESS_TOKEN, "testToken"); runner.setProperty(GetHubSpot.CRM_ENDPOINT, CrmEndpoint.COMPANIES.getValue()); runner.setProperty(GetHubSpot.LIMIT, "1"); ``` ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hubspot; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpResponseStatus; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +@PrimaryNodeOnly +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"hubspot"}) +@CapabilityDescription("Retrieves JSON data from a private HubSpot application." + + " Supports incremental retrieval: Users can set the \"limit\" property which serves as the upper limit of the retrieved objects." + + " When this property is set the processor will retrieve new records. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is set, the paging cursor is saved after executing a request." + + " Only the objects after the paging cursor will be retrieved. The maximum number of retrieved objects is the 'Limit' attribute." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@DefaultSettings(yieldDuration = "10 sec") +public class GetHubSpot extends AbstractProcessor { + + static final PropertyDescriptor CRM_ENDPOINT = new PropertyDescriptor.Builder() + .name("crm-endpoint") + .displayName("HubSpot CRM API Endpoint") + .description("The HubSpot CRM API endpoint to which the Processor will send requests") + .required(true) + .allowableValues(CrmEndpoint.class) + .build(); + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("access-token") + .displayName("Access Token") + .description("Access Token to authenticate requests") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() + .name("crm-limit") + .displayName("Limit") + .description("The maximum number of results to request for each invocation of the Processor") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder() + .name("web-client-service-provider") + .displayName("Web Client Service Provider") + .description("Controller service for HTTP client operations") + .identifiesControllerService(WebClientServiceProvider.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful HTTP request.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("In case of HTTP client errors the flowfile will be routed to this relationship") + .build(); + + private static final String API_BASE_URI = "api.hubapi.com"; + private static final String HTTPS = "https"; + private static final String CURSOR_PARAMETER = "after"; + private static final String LIMIT_PARAMETER = "limit"; + private static final int TOO_MANY_REQUESTS = 429; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory(); + + private volatile WebClientServiceProvider webClientServiceProvider; + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + CRM_ENDPOINT, + ACCESS_TOKEN, + LIMIT, + WEB_CLIENT_SERVICE_PROVIDER + )); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>( + Collections.singletonList( + REL_SUCCESS + ))); + + @OnScheduled + public void onScheduled(final ProcessContext context) { + webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final String accessToken = context.getProperty(ACCESS_TOKEN).getValue(); + final String endpoint = context.getProperty(CRM_ENDPOINT).getValue(); + + final StateMap state = getStateMap(context); + final URI uri = createUri(context, state); + + final HttpResponseEntity response = getHttpResponseEntity(accessToken, uri); + final AtomicInteger objectCountHolder = new AtomicInteger(); + + if (response.statusCode() == HttpResponseStatus.OK.getCode()) { + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, parseHttpResponse(context, endpoint, state, response, objectCountHolder)); + if (objectCountHolder.get() > 0) { + session.transfer(flowFile, REL_SUCCESS); + } else { + getLogger().debug("Empty response when requested HubSpot endpoint: [{}]", endpoint); + } + } else if (response.statusCode() >= 400) { + if (response.statusCode() == TOO_MANY_REQUESTS) { + context.yield(); + throw new ProcessException(String.format("Rate limit exceeded, yielding before retrying request. HTTP %d error for requested URI [%s]", response.statusCode(), uri)); + } else { + getLogger().warn("HTTP {} error for requested URI [{}]", response.statusCode(), uri); + } + } + } + + private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) { + return out -> { + try (JsonParser jsonParser = JSON_FACTORY.createParser(response.body()); Review Comment: Missing `final` ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hubspot; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpResponseStatus; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +@PrimaryNodeOnly +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"hubspot"}) +@CapabilityDescription("Retrieves JSON data from a private HubSpot application." + + " Supports incremental retrieval: Users can set the \"limit\" property which serves as the upper limit of the retrieved objects." + + " When this property is set the processor will retrieve new records. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is set, the paging cursor is saved after executing a request." + + " Only the objects after the paging cursor will be retrieved. The maximum number of retrieved objects is the 'Limit' attribute." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@DefaultSettings(yieldDuration = "10 sec") +public class GetHubSpot extends AbstractProcessor { + + static final PropertyDescriptor CRM_ENDPOINT = new PropertyDescriptor.Builder() + .name("crm-endpoint") + .displayName("HubSpot CRM API Endpoint") + .description("The HubSpot CRM API endpoint to which the Processor will send requests") + .required(true) + .allowableValues(CrmEndpoint.class) + .build(); + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("access-token") + .displayName("Access Token") + .description("Access Token to authenticate requests") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() + .name("crm-limit") + .displayName("Limit") + .description("The maximum number of results to request for each invocation of the Processor") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder() + .name("web-client-service-provider") + .displayName("Web Client Service Provider") + .description("Controller service for HTTP client operations") + .identifiesControllerService(WebClientServiceProvider.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful HTTP request.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("In case of HTTP client errors the flowfile will be routed to this relationship") + .build(); + + private static final String API_BASE_URI = "api.hubapi.com"; + private static final String HTTPS = "https"; + private static final String CURSOR_PARAMETER = "after"; + private static final String LIMIT_PARAMETER = "limit"; + private static final int TOO_MANY_REQUESTS = 429; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory(); + + private volatile WebClientServiceProvider webClientServiceProvider; + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + CRM_ENDPOINT, + ACCESS_TOKEN, + LIMIT, + WEB_CLIENT_SERVICE_PROVIDER + )); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>( + Collections.singletonList( + REL_SUCCESS + ))); + + @OnScheduled + public void onScheduled(final ProcessContext context) { + webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final String accessToken = context.getProperty(ACCESS_TOKEN).getValue(); + final String endpoint = context.getProperty(CRM_ENDPOINT).getValue(); + + final StateMap state = getStateMap(context); + final URI uri = createUri(context, state); + + final HttpResponseEntity response = getHttpResponseEntity(accessToken, uri); + final AtomicInteger objectCountHolder = new AtomicInteger(); + + if (response.statusCode() == HttpResponseStatus.OK.getCode()) { + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, parseHttpResponse(context, endpoint, state, response, objectCountHolder)); + if (objectCountHolder.get() > 0) { + session.transfer(flowFile, REL_SUCCESS); + } else { + getLogger().debug("Empty response when requested HubSpot endpoint: [{}]", endpoint); + } + } else if (response.statusCode() >= 400) { + if (response.statusCode() == TOO_MANY_REQUESTS) { + context.yield(); + throw new ProcessException(String.format("Rate limit exceeded, yielding before retrying request. HTTP %d error for requested URI [%s]", response.statusCode(), uri)); + } else { + getLogger().warn("HTTP {} error for requested URI [{}]", response.statusCode(), uri); + } + } + } + + private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) { + return out -> { + try (JsonParser jsonParser = JSON_FACTORY.createParser(response.body()); + final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) { + while (jsonParser.nextToken() != null) { + if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName() + .equals("results")) { + jsonParser.nextToken(); + jsonGenerator.copyCurrentStructure(jsonParser); + objectCountHolder.incrementAndGet(); + } + String fieldName = jsonParser.getCurrentName(); Review Comment: Missing `final` ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hubspot; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpResponseStatus; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +@PrimaryNodeOnly +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"hubspot"}) +@CapabilityDescription("Retrieves JSON data from a private HubSpot application." + + " Supports incremental retrieval: Users can set the \"limit\" property which serves as the upper limit of the retrieved objects." + + " When this property is set the processor will retrieve new records. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is set, the paging cursor is saved after executing a request." + + " Only the objects after the paging cursor will be retrieved. The maximum number of retrieved objects is the 'Limit' attribute." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@DefaultSettings(yieldDuration = "10 sec") +public class GetHubSpot extends AbstractProcessor { + + static final PropertyDescriptor CRM_ENDPOINT = new PropertyDescriptor.Builder() + .name("crm-endpoint") + .displayName("HubSpot CRM API Endpoint") + .description("The HubSpot CRM API endpoint to which the Processor will send requests") + .required(true) + .allowableValues(CrmEndpoint.class) + .build(); + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("access-token") + .displayName("Access Token") + .description("Access Token to authenticate requests") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() + .name("crm-limit") + .displayName("Limit") + .description("The maximum number of results to request for each invocation of the Processor") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder() + .name("web-client-service-provider") + .displayName("Web Client Service Provider") + .description("Controller service for HTTP client operations") + .identifiesControllerService(WebClientServiceProvider.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful HTTP request.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("In case of HTTP client errors the flowfile will be routed to this relationship") + .build(); + + private static final String API_BASE_URI = "api.hubapi.com"; + private static final String HTTPS = "https"; + private static final String CURSOR_PARAMETER = "after"; + private static final String LIMIT_PARAMETER = "limit"; + private static final int TOO_MANY_REQUESTS = 429; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory(); + + private volatile WebClientServiceProvider webClientServiceProvider; + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + CRM_ENDPOINT, + ACCESS_TOKEN, + LIMIT, + WEB_CLIENT_SERVICE_PROVIDER + )); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>( + Collections.singletonList( + REL_SUCCESS + ))); + + @OnScheduled + public void onScheduled(final ProcessContext context) { + webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final String accessToken = context.getProperty(ACCESS_TOKEN).getValue(); + final String endpoint = context.getProperty(CRM_ENDPOINT).getValue(); + + final StateMap state = getStateMap(context); + final URI uri = createUri(context, state); + + final HttpResponseEntity response = getHttpResponseEntity(accessToken, uri); + final AtomicInteger objectCountHolder = new AtomicInteger(); + + if (response.statusCode() == HttpResponseStatus.OK.getCode()) { + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, parseHttpResponse(context, endpoint, state, response, objectCountHolder)); + if (objectCountHolder.get() > 0) { + session.transfer(flowFile, REL_SUCCESS); + } else { + getLogger().debug("Empty response when requested HubSpot endpoint: [{}]", endpoint); + } + } else if (response.statusCode() >= 400) { + if (response.statusCode() == TOO_MANY_REQUESTS) { + context.yield(); + throw new ProcessException(String.format("Rate limit exceeded, yielding before retrying request. HTTP %d error for requested URI [%s]", response.statusCode(), uri)); + } else { + getLogger().warn("HTTP {} error for requested URI [{}]", response.statusCode(), uri); + } + } + } + + private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) { + return out -> { + try (JsonParser jsonParser = JSON_FACTORY.createParser(response.body()); + final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) { + while (jsonParser.nextToken() != null) { + if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName() + .equals("results")) { + jsonParser.nextToken(); + jsonGenerator.copyCurrentStructure(jsonParser); + objectCountHolder.incrementAndGet(); + } + String fieldName = jsonParser.getCurrentName(); + if (CURSOR_PARAMETER.equals(fieldName)) { + jsonParser.nextToken(); + Map<String, String> newStateMap = new HashMap<>(state.toMap()); + newStateMap.put(endpoint, jsonParser.getText()); + updateState(context, newStateMap); + break; + } + } + } + }; + } + + HttpUriBuilder getBaseUri(final ProcessContext context) { + final String path = context.getProperty(CRM_ENDPOINT).getValue(); + return webClientServiceProvider.getHttpUriBuilder() + .scheme(HTTPS) + .host(API_BASE_URI) + .encodedPath(path); + } + + private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) { + return webClientServiceProvider.getWebClientService() + .get() + .uri(uri) + .header("Authorization", "Bearer " + accessToken) + .retrieve(); + } + + private URI createUri(final ProcessContext context, final StateMap state) { + final String path = context.getProperty(CRM_ENDPOINT).getValue(); + final HttpUriBuilder uriBuilder = getBaseUri(context); + + final boolean isLimitSet = context.getProperty(LIMIT).isSet(); + if (isLimitSet) { + final String limit = context.getProperty(LIMIT).getValue(); Review Comment: Since `LIMIT` supports EL shouldn't `PropertyValue.evaluateAttributeExpressions()` be called before getting the value? ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hubspot; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.HttpUrl; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.web.client.StandardHttpUriBuilder; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class GetHubSpotTest { + + public static final String BASE_URL = "/test/hubspot"; + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static MockWebServer server; + private static HttpUrl baseUrl; + + @BeforeEach + void setup() throws IOException { + server = new MockWebServer(); + server.start(); + baseUrl = server.url(BASE_URL); + } + + @AfterEach + void tearDown() throws IOException { + if (server != null) { + server.shutdown(); + server = null; + } + } + + @Test + void testLimitIsAddedToUrl() throws InitializationException, InterruptedException, IOException { + + final String response = getResourceAsString("response-without-paging-cursor.json"); + server.enqueue(new MockResponse().setResponseCode(200).setBody(response)); + + final StandardWebClientServiceProvider standardWebClientServiceProvider = new StandardWebClientServiceProvider(); + final MockGetHubSpot mockGetHubSpot = new MockGetHubSpot(); + + TestRunner runner = TestRunners.newTestRunner(mockGetHubSpot); + runner.addControllerService("standardWebClientServiceProvider", standardWebClientServiceProvider); + runner.enableControllerService(standardWebClientServiceProvider); + + runner.setProperty(GetHubSpot.WEB_CLIENT_SERVICE_PROVIDER, standardWebClientServiceProvider.getIdentifier()); + runner.setProperty(GetHubSpot.ACCESS_TOKEN, "testToken"); + runner.setProperty(GetHubSpot.CRM_ENDPOINT, CrmEndpoint.COMPANIES.getValue()); + runner.setProperty(GetHubSpot.LIMIT, "1"); + + runner.run(1); + + RecordedRequest request = server.takeRequest(); + assertEquals(BASE_URL + "?limit=1", request.getPath()); + } + + @Test + void testPageCursorIsAddedToUrlFromState() throws InitializationException, InterruptedException, IOException { + + final String response = getResourceAsString("response-without-paging-cursor.json"); + server.enqueue(new MockResponse().setBody(response)); + + final StandardWebClientServiceProvider standardWebClientServiceProvider = new StandardWebClientServiceProvider(); + final MockGetHubSpot mockGetHubSpot = new MockGetHubSpot(); + + TestRunner runner = TestRunners.newTestRunner(mockGetHubSpot); + runner.addControllerService("standardWebClientServiceProvider", standardWebClientServiceProvider); + runner.enableControllerService(standardWebClientServiceProvider); + + runner.setProperty(GetHubSpot.WEB_CLIENT_SERVICE_PROVIDER, standardWebClientServiceProvider.getIdentifier()); + runner.setProperty(GetHubSpot.ACCESS_TOKEN, "testToken"); + runner.setProperty(GetHubSpot.CRM_ENDPOINT, CrmEndpoint.COMPANIES.getValue()); + runner.setProperty(GetHubSpot.LIMIT, "1"); + + runner.getStateManager().setState(Collections.singletonMap(CrmEndpoint.COMPANIES.getValue(), "12345"), Scope.CLUSTER); + + runner.run(1); + + RecordedRequest request = server.takeRequest(); + assertEquals(BASE_URL + "?limit=1&after=12345", request.getPath()); + } + + @Test + void testFlowFileContainsResultsArray() throws InitializationException, IOException { + + final String response = getResourceAsString("response-with-paging-cursor.json"); Review Comment: Please extract this to a constant `"response-with-paging-cursor.json"` ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hubspot; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpResponseStatus; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +@PrimaryNodeOnly +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"hubspot"}) +@CapabilityDescription("Retrieves JSON data from a private HubSpot application." + + " Supports incremental retrieval: Users can set the \"limit\" property which serves as the upper limit of the retrieved objects." + + " When this property is set the processor will retrieve new records. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is set, the paging cursor is saved after executing a request." + + " Only the objects after the paging cursor will be retrieved. The maximum number of retrieved objects is the 'Limit' attribute." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@DefaultSettings(yieldDuration = "10 sec") +public class GetHubSpot extends AbstractProcessor { + + static final PropertyDescriptor CRM_ENDPOINT = new PropertyDescriptor.Builder() + .name("crm-endpoint") + .displayName("HubSpot CRM API Endpoint") + .description("The HubSpot CRM API endpoint to which the Processor will send requests") + .required(true) + .allowableValues(CrmEndpoint.class) + .build(); + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("access-token") + .displayName("Access Token") + .description("Access Token to authenticate requests") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() + .name("crm-limit") + .displayName("Limit") + .description("The maximum number of results to request for each invocation of the Processor") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder() + .name("web-client-service-provider") + .displayName("Web Client Service Provider") + .description("Controller service for HTTP client operations") + .identifiesControllerService(WebClientServiceProvider.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful HTTP request.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("In case of HTTP client errors the flowfile will be routed to this relationship") + .build(); + + private static final String API_BASE_URI = "api.hubapi.com"; + private static final String HTTPS = "https"; + private static final String CURSOR_PARAMETER = "after"; + private static final String LIMIT_PARAMETER = "limit"; + private static final int TOO_MANY_REQUESTS = 429; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory(); + + private volatile WebClientServiceProvider webClientServiceProvider; + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + CRM_ENDPOINT, + ACCESS_TOKEN, + LIMIT, + WEB_CLIENT_SERVICE_PROVIDER + )); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>( + Collections.singletonList( + REL_SUCCESS + ))); + + @OnScheduled + public void onScheduled(final ProcessContext context) { + webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final String accessToken = context.getProperty(ACCESS_TOKEN).getValue(); + final String endpoint = context.getProperty(CRM_ENDPOINT).getValue(); + + final StateMap state = getStateMap(context); + final URI uri = createUri(context, state); + + final HttpResponseEntity response = getHttpResponseEntity(accessToken, uri); + final AtomicInteger objectCountHolder = new AtomicInteger(); + + if (response.statusCode() == HttpResponseStatus.OK.getCode()) { + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, parseHttpResponse(context, endpoint, state, response, objectCountHolder)); + if (objectCountHolder.get() > 0) { + session.transfer(flowFile, REL_SUCCESS); + } else { + getLogger().debug("Empty response when requested HubSpot endpoint: [{}]", endpoint); + } + } else if (response.statusCode() >= 400) { + if (response.statusCode() == TOO_MANY_REQUESTS) { + context.yield(); + throw new ProcessException(String.format("Rate limit exceeded, yielding before retrying request. HTTP %d error for requested URI [%s]", response.statusCode(), uri)); + } else { + getLogger().warn("HTTP {} error for requested URI [{}]", response.statusCode(), uri); + } + } + } + + private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) { + return out -> { + try (JsonParser jsonParser = JSON_FACTORY.createParser(response.body()); + final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) { Review Comment: Indentation is 5 space instead of 4 ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hubspot; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.HttpUrl; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.web.client.StandardHttpUriBuilder; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class GetHubSpotTest { + + public static final String BASE_URL = "/test/hubspot"; + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static MockWebServer server; + private static HttpUrl baseUrl; + + @BeforeEach + void setup() throws IOException { + server = new MockWebServer(); + server.start(); + baseUrl = server.url(BASE_URL); + } + + @AfterEach + void tearDown() throws IOException { + if (server != null) { + server.shutdown(); + server = null; + } + } + + @Test + void testLimitIsAddedToUrl() throws InitializationException, InterruptedException, IOException { + + final String response = getResourceAsString("response-without-paging-cursor.json"); Review Comment: Please extract this to a constant `"response-without-paging-cursor.json"` ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hubspot; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpResponseStatus; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +@PrimaryNodeOnly +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"hubspot"}) +@CapabilityDescription("Retrieves JSON data from a private HubSpot application." + + " Supports incremental retrieval: Users can set the \"limit\" property which serves as the upper limit of the retrieved objects." + + " When this property is set the processor will retrieve new records. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is set, the paging cursor is saved after executing a request." + + " Only the objects after the paging cursor will be retrieved. The maximum number of retrieved objects is the 'Limit' attribute." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@DefaultSettings(yieldDuration = "10 sec") +public class GetHubSpot extends AbstractProcessor { + + static final PropertyDescriptor CRM_ENDPOINT = new PropertyDescriptor.Builder() + .name("crm-endpoint") + .displayName("HubSpot CRM API Endpoint") + .description("The HubSpot CRM API endpoint to which the Processor will send requests") + .required(true) + .allowableValues(CrmEndpoint.class) + .build(); + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("access-token") + .displayName("Access Token") + .description("Access Token to authenticate requests") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() + .name("crm-limit") + .displayName("Limit") + .description("The maximum number of results to request for each invocation of the Processor") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder() + .name("web-client-service-provider") + .displayName("Web Client Service Provider") + .description("Controller service for HTTP client operations") + .identifiesControllerService(WebClientServiceProvider.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful HTTP request.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("In case of HTTP client errors the flowfile will be routed to this relationship") + .build(); + + private static final String API_BASE_URI = "api.hubapi.com"; + private static final String HTTPS = "https"; + private static final String CURSOR_PARAMETER = "after"; + private static final String LIMIT_PARAMETER = "limit"; + private static final int TOO_MANY_REQUESTS = 429; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory(); + + private volatile WebClientServiceProvider webClientServiceProvider; + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + CRM_ENDPOINT, + ACCESS_TOKEN, + LIMIT, + WEB_CLIENT_SERVICE_PROVIDER + )); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>( Review Comment: You can use `Collections.singleton(REL_SUCCESS)` to instantiate a singleton immutable set -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
