tpalfy commented on code in PR #6303:
URL: https://github.com/apache/nifi/pull/6303#discussion_r978013940


##########
nifi-nar-bundles/nifi-shopify-bundle/nifi-shopify-processors/src/main/resources/docs/org.apache.nifi.shopify.processors.GetShopify/additionalDetails.html:
##########
@@ -0,0 +1,148 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8"/>
+    <title>GetShopify</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+
+<body>
+<h2>Setting Up a Custom App</h2>
+<p>
+    Follow the <a 
href="https://help.shopify.com/en/manual/apps/custom-apps";>Shopify tutorial</a> 
to enable and create
+    private apps, set API Scopes and generate API tokens.
+</p>
+<h2>Incremental Loading</h2>
+<p>
+    Some resources can be processed incrementally by NiFi. This means that 
only resources created or modified after the
+    last run time of the processor are displayed. The processor state can be 
reset in the context menu. The following
+    list shows which date-time fields are incremented for which resources.
+<ul>
+    <li>Access
+        <ul>
+            <li>Access Scope: none</li>
+            <li>StoreFront Access Token: none</li>
+        </ul>
+    </li>
+    <li>Analytics
+        <ul>
+            <li>Reports: updated_at_min</li>
+        </ul>
+    </li>
+    <li>Billing
+        <ul>
+            <li>Application Charge: none</li>
+            <li>Application Credit: none</li>
+            <li>Recurring Application Charge: none</li>
+        </ul>
+    </li>
+    <li>Customers
+        <ul>
+            <li>Customers: updated_at_min</li>
+            <li>Customer Saved Searches: none</li>
+        </ul>
+    </li>
+    <li>Discounts
+        <ul>
+            <li>Price Rules: updated_at_min</li>
+        </ul>
+    </li>
+    <li>Events
+        <ul>
+            <li>Events: created_at_min</li>
+        </ul>
+    </li>
+    <li>Inventory
+        <ul>
+            <li>Inventory Levels: updated_at_min</li>
+            <li>Locations: none</li>
+        </ul>
+    </li>
+    <li>Marketing Event
+        <ul>
+            <li>Marketing Events: none</li>
+        </ul>
+    </li>
+    <li>Metafields

Review Comment:
   Some resources are mentioned here but cannot be chosen on the Processor 
itself (like `Metafields`)



##########
nifi-nar-bundles/nifi-shopify-bundle/nifi-shopify-processors/src/main/java/org/apache/nifi/processors/shopify/GetShopify.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.shopify;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.shopify.model.IncrementalLoadingParameter;
+import org.apache.nifi.processors.shopify.model.ResourceType;
+import org.apache.nifi.processors.shopify.model.ShopifyResource;
+import org.apache.nifi.processors.shopify.rest.ShopifyRestService;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpResponseStatus;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"shopify"})
+@Stateful(scopes = Scope.CLUSTER, description =
+        "For a few resources the processors support incremental loading. The 
list of the resources with the supported parameters" +
+                " can be found in additional details.")
+@CapabilityDescription("Retrieves object from a custom Shopify store. The 
processor yield time must be set to the account's rate limit accordingly.")
+public class GetShopify extends AbstractProcessor {
+
+    static final PropertyDescriptor STORE_DOMAIN = new 
PropertyDescriptor.Builder()
+            .name("store-domain")
+            .displayName("Store Domain")
+            .description("The domain of the Shopify store, e.g. 
nifistore.myshopify.com")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor API_VERSION = new 
PropertyDescriptor.Builder()
+            .name("api-version")
+            .displayName("API Version")
+            .description("The Shopify REST API version")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("2022-10")
+            .build();
+
+    static final PropertyDescriptor OBJECT_CATEGORY = new 
PropertyDescriptor.Builder()
+            .name("object-category")
+            .displayName("Object Category")
+            .description("Shopify object category")
+            .required(true)
+            .allowableValues(ResourceType.class)
+            .build();
+
+    static final PropertyDescriptor RESULT_LIMIT = new 
PropertyDescriptor.Builder()
+            .name("result-limit")
+            .displayName("Result Limit")
+            .description("The maximum number of results to request for each 
invocation of the Processor")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new 
PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried 
objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects which were 
created or modified after the previous run time" +
+                    " but before the current time.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new 
PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description("The ending timestamp of the time window will be 
adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an 
ending timestamp of 12:30:45 would be changed to 12:30:35.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .dependsOn(IS_INCREMENTAL, "true")
+            .build();
+
+    static final PropertyDescriptor INITIAL_INCREMENTAL_FILTER = new 
PropertyDescriptor.Builder()
+            .name("initial-incremental-filter")
+            .displayName("Initial Incremental Start Time")
+            .description("This property specifies the start time as Epoch Time 
that the processor applies when running the first request.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .dependsOn(IS_INCREMENTAL, "true")
+            .build();
+
+    public static final PropertyDescriptor WEB_CLIENT_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Controller service for HTTP client operations")
+            .required(true)
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
query.")
+            .build();
+
+    private static final Map<ResourceType, PropertyDescriptor> propertyMap = 
new EnumMap<>(ResourceType.class);
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
createPropertyDescriptors();
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
+    private static List<PropertyDescriptor> createPropertyDescriptors() {
+        final List<PropertyDescriptor> resourceDescriptors = 
Arrays.stream(ResourceType.values())
+                .map(resourceType -> {
+                    final PropertyDescriptor resourceDescriptor = new 
PropertyDescriptor.Builder()
+                            .name(resourceType.getValue())
+                            .displayName(resourceType.getPropertyDisplayName())
+                            .description(resourceType.getPropertyDescription())
+                            .required(true)
+                            .dependsOn(OBJECT_CATEGORY, 
resourceType.getValue())
+                            
.allowableValues(resourceType.getResourcesAsAllowableValues())
+                            
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+                            .build();
+                    propertyMap.put(resourceType, resourceDescriptor);
+                    return resourceDescriptor;
+                })
+                .collect(Collectors.toList());
+        final List<PropertyDescriptor> propertyDescriptors = new 
ArrayList<>(Arrays.asList(
+                STORE_DOMAIN,
+                ACCESS_TOKEN,
+                API_VERSION,
+                OBJECT_CATEGORY
+        ));
+        propertyDescriptors.addAll(resourceDescriptors);
+        propertyDescriptors.addAll(Arrays.asList(
+                RESULT_LIMIT,
+                IS_INCREMENTAL,
+                INCREMENTAL_DELAY,
+                INITIAL_INCREMENTAL_FILTER,
+                WEB_CLIENT_PROVIDER)
+        );
+        return Collections.unmodifiableList(propertyDescriptors);
+    }
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+    private static final int TOO_MANY_REQUESTS = 429;
+    private static final Pattern CURSOR_PATTERN = Pattern.compile("<([^<]*)>; 
rel=\"next\"");
+
+    private volatile ShopifyRestService shopifyRestService;
+    private volatile ShopifyResource shopifyResource;
+    private volatile String resourceName;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final WebClientServiceProvider webClientServiceProvider =
+                
context.getProperty(WEB_CLIENT_PROVIDER).asControllerService(WebClientServiceProvider.class);
+        final WebClientService webClientService = 
webClientServiceProvider.getWebClientService();
+        final HttpUriBuilder uriBuilder = 
webClientServiceProvider.getHttpUriBuilder();
+
+        final String apiVersion = context.getProperty(API_VERSION).getValue();
+        final String baseUrl = context.getProperty(STORE_DOMAIN).getValue();
+        final String accessToken = 
context.getProperty(ACCESS_TOKEN).getValue();
+
+        final String category = 
context.getProperty(OBJECT_CATEGORY).getValue();
+        final ResourceType resourceType = ResourceType.valueOf(category);
+        resourceName = 
context.getProperty(propertyMap.get(resourceType)).getValue();
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+
+        shopifyResource = resourceType.getResource(resourceName);
+
+        shopifyRestService = getShopifyRestService(webClientService, 
uriBuilder, apiVersion, baseUrl, accessToken, resourceName,
+                limit, shopifyResource.getIncrementalLoadingParameter());
+    }
+
+    ShopifyRestService getShopifyRestService(final WebClientService 
webClientService, final HttpUriBuilder uriBuilder,
+                                             final String apiVersion, final 
String baseUrl, final String accessToken, final String resourceName,
+                                             final String limit, final 
IncrementalLoadingParameter incrementalLoadingParameter) {
+        return new ShopifyRestService(
+                webClientService,
+                uriBuilder,
+                apiVersion,
+                baseUrl,
+                accessToken,
+                resourceName,
+                limit,
+                incrementalLoadingParameter
+        );
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final StateMap state = getState(context);
+        final boolean isIncremental = 
context.getProperty(IS_INCREMENTAL).asBoolean();
+        final String initialStartTime = 
context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue();
+        final Long incrDelayMs = 
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+
+        String lastExecutionTime = state.get(resourceName);
+        if (lastExecutionTime == null && initialStartTime != null) {
+            lastExecutionTime = initialStartTime;
+        }
+
+        Instant now = getCurrentExecutionTime();
+        if (incrDelayMs != null) {
+            now = now.minus(incrDelayMs, ChronoUnit.MILLIS);
+        }
+        final String currentExecutionTime = now.toString();
+
+        String cursor = null;
+        do {
+            final AtomicInteger objectCountHolder = new AtomicInteger();
+            try (HttpResponseEntity response = 
shopifyRestService.getShopifyObjects(isIncremental, lastExecutionTime, 
currentExecutionTime, cursor)) {
+                cursor = getPageCursor(response);
+                if (response.statusCode() == HttpResponseStatus.OK.getCode()) {
+                    FlowFile flowFile = session.create();
+                    flowFile = session.write(flowFile, 
parseHttpResponse(response, objectCountHolder));
+                    if (objectCountHolder.get() > 0) {
+                        session.transfer(flowFile, REL_SUCCESS);
+                        if (cursor == null && isIncremental) {

Review Comment:
   ```suggestion
                           if (cursor == null && isIncremental && 
shopifyResource.isPaginationSupported()) {
   ```



##########
nifi-nar-bundles/nifi-shopify-bundle/nifi-shopify-processors/src/main/java/org/apache/nifi/processors/shopify/GetShopify.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.shopify;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.shopify.model.IncrementalLoadingParameter;
+import org.apache.nifi.processors.shopify.model.ResourceType;
+import org.apache.nifi.processors.shopify.model.ShopifyResource;
+import org.apache.nifi.processors.shopify.rest.ShopifyRestService;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpResponseStatus;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"shopify"})
+@Stateful(scopes = Scope.CLUSTER, description =
+        "For a few resources the processors support incremental loading. The 
list of the resources with the supported parameters" +
+                " can be found in additional details.")
+@CapabilityDescription("Retrieves object from a custom Shopify store. The 
processor yield time must be set to the account's rate limit accordingly.")
+public class GetShopify extends AbstractProcessor {
+
+    static final PropertyDescriptor STORE_DOMAIN = new 
PropertyDescriptor.Builder()
+            .name("store-domain")
+            .displayName("Store Domain")
+            .description("The domain of the Shopify store, e.g. 
nifistore.myshopify.com")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("access-token")
+            .displayName("Access Token")
+            .description("Access Token to authenticate requests")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor API_VERSION = new 
PropertyDescriptor.Builder()
+            .name("api-version")
+            .displayName("API Version")
+            .description("The Shopify REST API version")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("2022-10")
+            .build();
+
+    static final PropertyDescriptor OBJECT_CATEGORY = new 
PropertyDescriptor.Builder()
+            .name("object-category")
+            .displayName("Object Category")
+            .description("Shopify object category")
+            .required(true)
+            .allowableValues(ResourceType.class)
+            .build();
+
+    static final PropertyDescriptor RESULT_LIMIT = new 
PropertyDescriptor.Builder()
+            .name("result-limit")
+            .displayName("Result Limit")
+            .description("The maximum number of results to request for each 
invocation of the Processor")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new 
PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried 
objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects which were 
created or modified after the previous run time" +
+                    " but before the current time.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new 
PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description("The ending timestamp of the time window will be 
adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an 
ending timestamp of 12:30:45 would be changed to 12:30:35.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .dependsOn(IS_INCREMENTAL, "true")
+            .build();
+
+    static final PropertyDescriptor INITIAL_INCREMENTAL_FILTER = new 
PropertyDescriptor.Builder()
+            .name("initial-incremental-filter")
+            .displayName("Initial Incremental Start Time")
+            .description("This property specifies the start time as Epoch Time 
that the processor applies when running the first request.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .dependsOn(IS_INCREMENTAL, "true")
+            .build();
+
+    public static final PropertyDescriptor WEB_CLIENT_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Controller service for HTTP client operations")
+            .required(true)
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
query.")
+            .build();
+
+    private static final Map<ResourceType, PropertyDescriptor> propertyMap = 
new EnumMap<>(ResourceType.class);
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
createPropertyDescriptors();
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
+    private static List<PropertyDescriptor> createPropertyDescriptors() {
+        final List<PropertyDescriptor> resourceDescriptors = 
Arrays.stream(ResourceType.values())
+                .map(resourceType -> {
+                    final PropertyDescriptor resourceDescriptor = new 
PropertyDescriptor.Builder()
+                            .name(resourceType.getValue())
+                            .displayName(resourceType.getPropertyDisplayName())
+                            .description(resourceType.getPropertyDescription())
+                            .required(true)
+                            .dependsOn(OBJECT_CATEGORY, 
resourceType.getValue())
+                            
.allowableValues(resourceType.getResourcesAsAllowableValues())
+                            
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+                            .build();
+                    propertyMap.put(resourceType, resourceDescriptor);
+                    return resourceDescriptor;
+                })
+                .collect(Collectors.toList());
+        final List<PropertyDescriptor> propertyDescriptors = new 
ArrayList<>(Arrays.asList(
+                STORE_DOMAIN,
+                ACCESS_TOKEN,
+                API_VERSION,
+                OBJECT_CATEGORY
+        ));
+        propertyDescriptors.addAll(resourceDescriptors);
+        propertyDescriptors.addAll(Arrays.asList(
+                RESULT_LIMIT,
+                IS_INCREMENTAL,
+                INCREMENTAL_DELAY,
+                INITIAL_INCREMENTAL_FILTER,
+                WEB_CLIENT_PROVIDER)
+        );
+        return Collections.unmodifiableList(propertyDescriptors);
+    }
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+    private static final int TOO_MANY_REQUESTS = 429;
+    private static final Pattern CURSOR_PATTERN = Pattern.compile("<([^<]*)>; 
rel=\"next\"");
+
+    private volatile ShopifyRestService shopifyRestService;
+    private volatile ShopifyResource shopifyResource;
+    private volatile String resourceName;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final WebClientServiceProvider webClientServiceProvider =
+                
context.getProperty(WEB_CLIENT_PROVIDER).asControllerService(WebClientServiceProvider.class);
+        final WebClientService webClientService = 
webClientServiceProvider.getWebClientService();
+        final HttpUriBuilder uriBuilder = 
webClientServiceProvider.getHttpUriBuilder();
+
+        final String apiVersion = context.getProperty(API_VERSION).getValue();
+        final String baseUrl = context.getProperty(STORE_DOMAIN).getValue();
+        final String accessToken = 
context.getProperty(ACCESS_TOKEN).getValue();
+
+        final String category = 
context.getProperty(OBJECT_CATEGORY).getValue();
+        final ResourceType resourceType = ResourceType.valueOf(category);
+        resourceName = 
context.getProperty(propertyMap.get(resourceType)).getValue();
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+
+        shopifyResource = resourceType.getResource(resourceName);
+
+        shopifyRestService = getShopifyRestService(webClientService, 
uriBuilder, apiVersion, baseUrl, accessToken, resourceName,
+                limit, shopifyResource.getIncrementalLoadingParameter());
+    }
+
+    ShopifyRestService getShopifyRestService(final WebClientService 
webClientService, final HttpUriBuilder uriBuilder,
+                                             final String apiVersion, final 
String baseUrl, final String accessToken, final String resourceName,
+                                             final String limit, final 
IncrementalLoadingParameter incrementalLoadingParameter) {
+        return new ShopifyRestService(
+                webClientService,
+                uriBuilder,
+                apiVersion,
+                baseUrl,
+                accessToken,
+                resourceName,
+                limit,
+                incrementalLoadingParameter
+        );
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final StateMap state = getState(context);
+        final boolean isIncremental = 
context.getProperty(IS_INCREMENTAL).asBoolean();
+        final String initialStartTime = 
context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue();
+        final Long incrDelayMs = 
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+
+        String lastExecutionTime = state.get(resourceName);
+        if (lastExecutionTime == null && initialStartTime != null) {
+            lastExecutionTime = initialStartTime;
+        }
+
+        Instant now = getCurrentExecutionTime();
+        if (incrDelayMs != null) {
+            now = now.minus(incrDelayMs, ChronoUnit.MILLIS);
+        }
+        final String currentExecutionTime = now.toString();
+
+        String cursor = null;
+        do {
+            final AtomicInteger objectCountHolder = new AtomicInteger();
+            try (HttpResponseEntity response = 
shopifyRestService.getShopifyObjects(isIncremental, lastExecutionTime, 
currentExecutionTime, cursor)) {
+                cursor = getPageCursor(response);
+                if (response.statusCode() == HttpResponseStatus.OK.getCode()) {
+                    FlowFile flowFile = session.create();
+                    flowFile = session.write(flowFile, 
parseHttpResponse(response, objectCountHolder));
+                    if (objectCountHolder.get() > 0) {
+                        session.transfer(flowFile, REL_SUCCESS);
+                        if (cursor == null && isIncremental) {
+                            final Map<String, String> stateMap = new 
HashMap<>(state.toMap());
+                            stateMap.put(resourceName, currentExecutionTime);
+                            updateState(context, stateMap);
+                        }
+                    } else {
+                        getLogger().debug("Empty response when requested 
Shopify resource: [{}]", resourceName);
+                        session.remove(flowFile);
+                    }
+                } else if (response.statusCode() >= 400) {
+                    if (response.statusCode() == TOO_MANY_REQUESTS) {
+                        context.yield();
+                        throw new ProcessException(String.format(
+                                "Rate limit exceeded, yielding before retrying 
request. HTTP %d error for requested URI [%s]",
+                                response.statusCode(), resourceName));
+                    } else {
+                        context.yield();
+                        getLogger().warn("HTTP {} error for requested Shopify 
resource [{}]", response.statusCode(),
+                                resourceName);
+                    }
+                }

Review Comment:
   Shouldn't we at least log something if the response code is neither 200 nor 
400?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to