turcsanyip commented on code in PR #6350:
URL: https://github.com/apache/nifi/pull/6350#discussion_r970049032


##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult;
+import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import org.apache.nifi.processors.airtable.service.RateLimitExceededException;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster, so this Processor can run only 
on the Primary Node and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous one left off without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Records Per FlowFile' is set then all FlowFiles from the same query result 
set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Records Per FlowFile' is set then this is the total number of "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max 
Records Per FlowFile' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order "
+                + "FlowFiles were produced"),
+})
+@DefaultSettings(yieldDuration = "15 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)

Review Comment:
   Value `0` for configuring the default page size seems to me a bit technical 
approach (though I know other processors use it).
   I'd suggest making this property optional with valid values 1..100 and using 
"no value" for the default page size. 



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult;
+import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import org.apache.nifi.processors.airtable.service.RateLimitExceededException;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster, so this Processor can run only 
on the Primary Node and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous one left off without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Records Per FlowFile' is set then all FlowFiles from the same query result 
set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Records Per FlowFile' is set then this is the total number of "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max 
Records Per FlowFile' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order "
+                + "FlowFiles were produced"),
+})
+@DefaultSettings(yieldDuration = "15 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createLongValidator(0, 100, true))
+            .build();
+
+    static final PropertyDescriptor QUERY_TIME_WINDOW_LAG = new 
PropertyDescriptor.Builder()
+            .name("query-time-window-lag")
+            .displayName("Query Time Window Lag")
+            .description("The amount of lag to add to the query time window. 
Set this property to avoid missing records when the clock of your local 
machines"
+                    + " and Airtable servers' clock are not in sync. Must be 
greater than or equal to 1 second.")
+            .defaultValue("3 s")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_RECORDS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
+            .name("max-records-per-flowfile")
+            .displayName("Max Records Per FlowFile")
+            .description("The maximum number of result records that will be 
included in a single FlowFile. This will allow you to break up very large"
+                    + " result sets into multiple FlowFiles. If the value 
specified is zero, then all records are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)

Review Comment:
   Similar to Page Size, it should be an optional property and `0` should not 
be accepted as a valid value.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult;
+import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import org.apache.nifi.processors.airtable.service.RateLimitExceededException;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster, so this Processor can run only 
on the Primary Node and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous one left off without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Records Per FlowFile' is set then all FlowFiles from the same query result 
set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Records Per FlowFile' is set then this is the total number of "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max 
Records Per FlowFile' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order "
+                + "FlowFiles were produced"),
+})
+@DefaultSettings(yieldDuration = "15 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createLongValidator(0, 100, true))
+            .build();
+
+    static final PropertyDescriptor QUERY_TIME_WINDOW_LAG = new 
PropertyDescriptor.Builder()
+            .name("query-time-window-lag")
+            .displayName("Query Time Window Lag")
+            .description("The amount of lag to add to the query time window. 
Set this property to avoid missing records when the clock of your local 
machines"
+                    + " and Airtable servers' clock are not in sync. Must be 
greater than or equal to 1 second.")
+            .defaultValue("3 s")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_RECORDS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
+            .name("max-records-per-flowfile")
+            .displayName("Max Records Per FlowFile")
+            .description("The maximum number of result records that will be 
included in a single FlowFile. This will allow you to break up very large"
+                    + " result sets into multiple FlowFiles. If the value 
specified is zero, then all records are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Web Client Service Provider to use for Airtable REST 
API requests")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
query.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_KEY,
+            BASE_ID,
+            TABLE_ID,
+            FIELDS,
+            CUSTOM_FILTER,
+            PAGE_SIZE,
+            QUERY_TIME_WINDOW_LAG,

Review Comment:
   I'd suggest to move `Page Size` after `Query Time Window Lag` because Lag is 
more important and mandatory.
   
   It could also be renamed to `Query Page Size` to make it clear what it is 
used for.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/resources/docs/org.apache.nifi.processors.airtable.QueryAirtableTable/additionalDetails.html:
##########
@@ -0,0 +1,61 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html";>
+<head>
+    <meta charset="utf-8"/>
+    <title>QueryAirtableTable</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+    <style>
+h2 {margin-top: 4em}
+h3 {margin-top: 3em}
+td {text-align: left}
+    </style>
+</head>
+
+<body>
+
+<h1>QueryAirtableTable</h1>
+
+<h3>Description</h3>
+<p>
+    Airtable is a spreadsheet-database hybrid. In Airtable an application is 
called base and each base can have multiple tables.
+    A table consists of records (rows) and each record can have multiple 
fields (columns).
+    The QueryAirtableTable processor can query records from a single base and 
table via Airtable's REST API.
+    The processor utilizes streams to be able to handle a large number of 
records.
+    It can also split large record sets to multiple FlowFiles just like a 
database processor.
+</p>
+
+<h3>API Key</h3>
+<p>
+    Airtable REST API calls requires an API Key that needs to be passed in a 
request. An Airtable account is required to generate an API Key.
+</p>
+
+<h3>API rate limit</h3>
+<p>
+    The Airtable REST API limits the number of requests that can be sent on a 
per-base basis to avoid bottlenecks. Currently, this limit is 5 requests per 
second per base.
+    If this limit is exceeded you can't make another request for 30 seconds. 
It's your responsibility to handle this rate limit.

Review Comment:
   I think we should provide some hints on it, like:
   ```suggestion
       If this limit is exceeded you can't make another request for 30 seconds. 
It's your responsibility to handle this rate limit via configuring Yield 
Duration and Run Schedule properly. It is recommended to start off with the 
default settings and to increase both parameters when rate limit issues occur.
   ```
   



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult;
+import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import org.apache.nifi.processors.airtable.service.RateLimitExceededException;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster, so this Processor can run only 
on the Primary Node and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous one left off without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Records Per FlowFile' is set then all FlowFiles from the same query result 
set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Records Per FlowFile' is set then this is the total number of "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max 
Records Per FlowFile' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order "
+                + "FlowFiles were produced"),
+})
+@DefaultSettings(yieldDuration = "15 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createLongValidator(0, 100, true))
+            .build();
+
+    static final PropertyDescriptor QUERY_TIME_WINDOW_LAG = new 
PropertyDescriptor.Builder()
+            .name("query-time-window-lag")
+            .displayName("Query Time Window Lag")
+            .description("The amount of lag to add to the query time window. 
Set this property to avoid missing records when the clock of your local 
machines"
+                    + " and Airtable servers' clock are not in sync. Must be 
greater than or equal to 1 second.")
+            .defaultValue("3 s")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_RECORDS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
+            .name("max-records-per-flowfile")
+            .displayName("Max Records Per FlowFile")
+            .description("The maximum number of result records that will be 
included in a single FlowFile. This will allow you to break up very large"
+                    + " result sets into multiple FlowFiles. If the value 
specified is zero, then all records are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Web Client Service Provider to use for Airtable REST 
API requests")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
query.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_KEY,
+            BASE_ID,
+            TABLE_ID,
+            FIELDS,
+            CUSTOM_FILTER,
+            PAGE_SIZE,
+            QUERY_TIME_WINDOW_LAG,
+            MAX_RECORDS_PER_FLOWFILE,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
+    private static final String LAST_RECORD_FETCH_TIME = 
"last_record_fetch_time";
+
+    private volatile AirtableRestService airtableRestService;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final String apiUrl = 
context.getProperty(API_URL).evaluateAttributeExpressions().getValue();
+        final String apiKey = context.getProperty(API_KEY).getValue();
+        final String baseId = 
context.getProperty(BASE_ID).evaluateAttributeExpressions().getValue();
+        final String tableId = 
context.getProperty(TABLE_ID).evaluateAttributeExpressions().getValue();
+        final WebClientServiceProvider webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+        airtableRestService = new 
AirtableRestService(webClientServiceProvider, apiUrl, apiKey, baseId, tableId);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final Integer maxRecordsPerFlowFile = 
context.getProperty(MAX_RECORDS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger();
+        final Long queryTimeWindowLagSeconds = 
context.getProperty(QUERY_TIME_WINDOW_LAG).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
+
+        final StateMap state;
+        try {
+            state = context.getStateManager().getState(Scope.CLUSTER);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to get cluster state", e);
+        }
+
+        final String lastRecordFetchDateTime = 
state.get(LAST_RECORD_FETCH_TIME);
+        final String currentRecordFetchDateTime = OffsetDateTime.now()
+                .minusSeconds(queryTimeWindowLagSeconds)
+                .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+
+        final AirtableGetRecordsParameters getRecordsParameters = 
buildGetRecordsParameters(context, lastRecordFetchDateTime, 
currentRecordFetchDateTime);
+        final AirtableRetrieveTableResult retrieveTableResult;
+        try {
+            final AirtableTableRetriever tableRetriever = new 
AirtableTableRetriever(airtableRestService, getRecordsParameters, 
maxRecordsPerFlowFile);
+            retrieveTableResult = tableRetriever.retrieveAll(session);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to read Airtable records", e);
+        } catch (RateLimitExceededException e) {
+            context.yield();
+            throw new ProcessException("Airtable REST API rate limit exceeded 
while reading records", e);
+        }
+
+        final Map<String, String> newState = new HashMap<>(state.toMap());
+        newState.put(LAST_RECORD_FETCH_TIME, currentRecordFetchDateTime);
+        try {
+            context.getStateManager().setState(newState, Scope.CLUSTER);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to update cluster state", e);
+        }
+
+        final List<FlowFile> flowFiles = retrieveTableResult.getFlowFiles();
+        if (flowFiles.isEmpty()) {
+            context.yield();
+            return;
+        }
+
+        if (maxRecordsPerFlowFile != null && maxRecordsPerFlowFile > 0) {
+            fragmentFlowFiles(session, flowFiles);
+        }
+        final String transitUri = 
airtableRestService.createUriBuilder(false).build().toString();
+        transferFlowFiles(session, flowFiles, 
retrieveTableResult.getTotalRecordCount(), transitUri);
+    }
+
+    private AirtableGetRecordsParameters buildGetRecordsParameters(final 
ProcessContext context,
+            final String lastRecordFetchTime,
+            final String nowDateTimeString) {
+        Objects.requireNonNull(context);
+        Objects.requireNonNull(nowDateTimeString);
+
+        final String fieldsProperty = 
context.getProperty(FIELDS).evaluateAttributeExpressions().getValue();
+        final String customFilter = 
context.getProperty(CUSTOM_FILTER).evaluateAttributeExpressions().getValue();
+        final Integer pageSize = 
context.getProperty(PAGE_SIZE).evaluateAttributeExpressions().asInteger();
+
+        final AirtableGetRecordsParameters.Builder getRecordsParametersBuilder 
= new AirtableGetRecordsParameters.Builder();
+        if (lastRecordFetchTime != null) {
+            getRecordsParametersBuilder.modifiedAfter(lastRecordFetchTime);
+        }
+        getRecordsParametersBuilder.modifiedBefore(nowDateTimeString);
+        if (fieldsProperty != null) {
+            
getRecordsParametersBuilder.fields(Arrays.stream(fieldsProperty.split(",")).map(String::trim).collect(Collectors.toList()));
+        }
+        getRecordsParametersBuilder.customFilter(customFilter);
+        if (pageSize != null && pageSize > 0) {
+            getRecordsParametersBuilder.pageSize(pageSize);
+        }
+
+        return getRecordsParametersBuilder.build();
+    }
+
+    private void fragmentFlowFiles(final ProcessSession session, final 
List<FlowFile> flowFiles) {

Review Comment:
   The FlowFiles are already fragmented. For this reason, 
`addFragmentAttributes()` method name would be better.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult;
+import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import org.apache.nifi.processors.airtable.service.RateLimitExceededException;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster, so this Processor can run only 
on the Primary Node and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous one left off without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Records Per FlowFile' is set then all FlowFiles from the same query result 
set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Records Per FlowFile' is set then this is the total number of "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max 
Records Per FlowFile' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order "
+                + "FlowFiles were produced"),
+})
+@DefaultSettings(yieldDuration = "15 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createLongValidator(0, 100, true))
+            .build();
+
+    static final PropertyDescriptor QUERY_TIME_WINDOW_LAG = new 
PropertyDescriptor.Builder()
+            .name("query-time-window-lag")
+            .displayName("Query Time Window Lag")
+            .description("The amount of lag to add to the query time window. 
Set this property to avoid missing records when the clock of your local 
machines"
+                    + " and Airtable servers' clock are not in sync. Must be 
greater than or equal to 1 second.")
+            .defaultValue("3 s")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_RECORDS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
+            .name("max-records-per-flowfile")
+            .displayName("Max Records Per FlowFile")
+            .description("The maximum number of result records that will be 
included in a single FlowFile. This will allow you to break up very large"
+                    + " result sets into multiple FlowFiles. If the value 
specified is zero, then all records are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Web Client Service Provider to use for Airtable REST 
API requests")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
query.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_KEY,
+            BASE_ID,
+            TABLE_ID,
+            FIELDS,
+            CUSTOM_FILTER,
+            PAGE_SIZE,
+            QUERY_TIME_WINDOW_LAG,
+            MAX_RECORDS_PER_FLOWFILE,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
+    private static final String LAST_RECORD_FETCH_TIME = 
"last_record_fetch_time";

Review Comment:
   To make the labels on the UI more consistent, it should be called 
`last_query_time_window_end` (similar to the `Query Time Window Lag` property).
   
   The constant / local variable names can be adjusted accordingly, but the 
main point here is to make it more understandable for the user.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult;
+import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import org.apache.nifi.processors.airtable.service.RateLimitExceededException;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster, so this Processor can run only 
on the Primary Node and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous one left off without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Records Per FlowFile' is set then all FlowFiles from the same query result 
set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Records Per FlowFile' is set then this is the total number of "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max 
Records Per FlowFile' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order "
+                + "FlowFiles were produced"),
+})
+@DefaultSettings(yieldDuration = "15 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createLongValidator(0, 100, true))
+            .build();
+
+    static final PropertyDescriptor QUERY_TIME_WINDOW_LAG = new 
PropertyDescriptor.Builder()
+            .name("query-time-window-lag")
+            .displayName("Query Time Window Lag")
+            .description("The amount of lag to add to the query time window. 
Set this property to avoid missing records when the clock of your local 
machines"
+                    + " and Airtable servers' clock are not in sync. Must be 
greater than or equal to 1 second.")
+            .defaultValue("3 s")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_RECORDS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
+            .name("max-records-per-flowfile")
+            .displayName("Max Records Per FlowFile")
+            .description("The maximum number of result records that will be 
included in a single FlowFile. This will allow you to break up very large"
+                    + " result sets into multiple FlowFiles. If the value 
specified is zero, then all records are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Web Client Service Provider to use for Airtable REST 
API requests")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
query.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_KEY,
+            BASE_ID,
+            TABLE_ID,
+            FIELDS,
+            CUSTOM_FILTER,
+            PAGE_SIZE,
+            QUERY_TIME_WINDOW_LAG,
+            MAX_RECORDS_PER_FLOWFILE,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
+    private static final String LAST_RECORD_FETCH_TIME = 
"last_record_fetch_time";
+
+    private volatile AirtableRestService airtableRestService;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final String apiUrl = 
context.getProperty(API_URL).evaluateAttributeExpressions().getValue();
+        final String apiKey = context.getProperty(API_KEY).getValue();
+        final String baseId = 
context.getProperty(BASE_ID).evaluateAttributeExpressions().getValue();
+        final String tableId = 
context.getProperty(TABLE_ID).evaluateAttributeExpressions().getValue();
+        final WebClientServiceProvider webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+        airtableRestService = new 
AirtableRestService(webClientServiceProvider, apiUrl, apiKey, baseId, tableId);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final Integer maxRecordsPerFlowFile = 
context.getProperty(MAX_RECORDS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger();
+        final Long queryTimeWindowLagSeconds = 
context.getProperty(QUERY_TIME_WINDOW_LAG).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
+
+        final StateMap state;
+        try {
+            state = context.getStateManager().getState(Scope.CLUSTER);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to get cluster state", e);
+        }
+
+        final String lastRecordFetchDateTime = 
state.get(LAST_RECORD_FETCH_TIME);
+        final String currentRecordFetchDateTime = OffsetDateTime.now()
+                .minusSeconds(queryTimeWindowLagSeconds)
+                .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);

Review Comment:
   As the Last Modified field of the records is stored in second precision in 
Airtable, I recommend using the same precision for the query time window as 
well. It is also more readable for humans and can make troubleshooting a bit 
easier.
   ```suggestion
           final String currentRecordFetchDateTime = OffsetDateTime.now()
                   .minusSeconds(queryTimeWindowLagSeconds)
                   .truncatedTo(ChronoUnit.SECONDS)
                   .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
   ```



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult;
+import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import org.apache.nifi.processors.airtable.service.RateLimitExceededException;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster, so this Processor can run only 
on the Primary Node and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous one left off without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Records Per FlowFile' is set then all FlowFiles from the same query result 
set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Records Per FlowFile' is set then this is the total number of "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max 
Records Per FlowFile' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order "
+                + "FlowFiles were produced"),
+})
+@DefaultSettings(yieldDuration = "15 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createLongValidator(0, 100, true))
+            .build();
+
+    static final PropertyDescriptor QUERY_TIME_WINDOW_LAG = new 
PropertyDescriptor.Builder()
+            .name("query-time-window-lag")
+            .displayName("Query Time Window Lag")
+            .description("The amount of lag to add to the query time window. 
Set this property to avoid missing records when the clock of your local 
machines"

Review Comment:
   ```suggestion
               .description("The amount of lag to be applied to the query time 
window's end point. Set this property to avoid missing records when the clock 
of your local machines"
   ```



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult;
+import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import org.apache.nifi.processors.airtable.service.RateLimitExceededException;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster, so this Processor can run only 
on the Primary Node and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous one left off without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Records Per FlowFile' is set then all FlowFiles from the same query result 
set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Records Per FlowFile' is set then this is the total number of "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max 
Records Per FlowFile' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order "
+                + "FlowFiles were produced"),
+})
+@DefaultSettings(yieldDuration = "15 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createLongValidator(0, 100, true))
+            .build();
+
+    static final PropertyDescriptor QUERY_TIME_WINDOW_LAG = new 
PropertyDescriptor.Builder()
+            .name("query-time-window-lag")
+            .displayName("Query Time Window Lag")
+            .description("The amount of lag to add to the query time window. 
Set this property to avoid missing records when the clock of your local 
machines"
+                    + " and Airtable servers' clock are not in sync. Must be 
greater than or equal to 1 second.")
+            .defaultValue("3 s")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_RECORDS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
+            .name("max-records-per-flowfile")
+            .displayName("Max Records Per FlowFile")
+            .description("The maximum number of result records that will be 
included in a single FlowFile. This will allow you to break up very large"
+                    + " result sets into multiple FlowFiles. If the value 
specified is zero, then all records are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Web Client Service Provider to use for Airtable REST 
API requests")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
query.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_KEY,
+            BASE_ID,
+            TABLE_ID,
+            FIELDS,
+            CUSTOM_FILTER,
+            PAGE_SIZE,
+            QUERY_TIME_WINDOW_LAG,
+            MAX_RECORDS_PER_FLOWFILE,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
+    private static final String LAST_RECORD_FETCH_TIME = 
"last_record_fetch_time";
+
+    private volatile AirtableRestService airtableRestService;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final String apiUrl = 
context.getProperty(API_URL).evaluateAttributeExpressions().getValue();
+        final String apiKey = context.getProperty(API_KEY).getValue();
+        final String baseId = 
context.getProperty(BASE_ID).evaluateAttributeExpressions().getValue();
+        final String tableId = 
context.getProperty(TABLE_ID).evaluateAttributeExpressions().getValue();
+        final WebClientServiceProvider webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+        airtableRestService = new 
AirtableRestService(webClientServiceProvider, apiUrl, apiKey, baseId, tableId);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final Integer maxRecordsPerFlowFile = 
context.getProperty(MAX_RECORDS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger();
+        final Long queryTimeWindowLagSeconds = 
context.getProperty(QUERY_TIME_WINDOW_LAG).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
+
+        final StateMap state;
+        try {
+            state = context.getStateManager().getState(Scope.CLUSTER);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to get cluster state", e);
+        }
+
+        final String lastRecordFetchDateTime = 
state.get(LAST_RECORD_FETCH_TIME);
+        final String currentRecordFetchDateTime = OffsetDateTime.now()
+                .minusSeconds(queryTimeWindowLagSeconds)
+                .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+
+        final AirtableGetRecordsParameters getRecordsParameters = 
buildGetRecordsParameters(context, lastRecordFetchDateTime, 
currentRecordFetchDateTime);
+        final AirtableRetrieveTableResult retrieveTableResult;
+        try {
+            final AirtableTableRetriever tableRetriever = new 
AirtableTableRetriever(airtableRestService, getRecordsParameters, 
maxRecordsPerFlowFile);
+            retrieveTableResult = tableRetriever.retrieveAll(session);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to read Airtable records", e);
+        } catch (RateLimitExceededException e) {
+            context.yield();
+            throw new ProcessException("Airtable REST API rate limit exceeded 
while reading records", e);
+        }
+
+        final Map<String, String> newState = new HashMap<>(state.toMap());
+        newState.put(LAST_RECORD_FETCH_TIME, currentRecordFetchDateTime);
+        try {
+            context.getStateManager().setState(newState, Scope.CLUSTER);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to update cluster state", e);
+        }
+
+        final List<FlowFile> flowFiles = retrieveTableResult.getFlowFiles();
+        if (flowFiles.isEmpty()) {
+            context.yield();
+            return;
+        }
+
+        if (maxRecordsPerFlowFile != null && maxRecordsPerFlowFile > 0) {
+            fragmentFlowFiles(session, flowFiles);
+        }
+        final String transitUri = 
airtableRestService.createUriBuilder(false).build().toString();

Review Comment:
   I think we can add the port in `transitUri` as well (in most cases it will 
not be present either). So the boolean flag is not needed.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/AirtableRestService.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable.service;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.Range;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+public class AirtableRestService {
+
+    public static final String API_V0_BASE_URL = "https://api.airtable.com/v0";;
+
+    private static final int TOO_MANY_REQUESTS = 429;
+    private static final Range<Integer> SUCCESSFUL_RESPONSE_RANGE = 
Range.between(200, 299);
+
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String apiUrl;
+    private final String apiKey;
+    private final String baseId;
+    private final String tableId;
+
+    public AirtableRestService(final WebClientServiceProvider 
webClientServiceProvider,
+            final String apiUrl,
+            final String apiKey,
+            final String baseId,
+            final String tableId) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.apiUrl = apiUrl;
+        this.apiKey = apiKey;
+        this.baseId = baseId;
+        this.tableId = tableId;
+    }
+
+    public <R> R getRecords(final AirtableGetRecordsParameters 
getRecordsParameters, final Function<InputStream, R> callback) {
+        final URI uri = buildUri(getRecordsParameters);
+        try (final HttpResponseEntity response = 
webClientServiceProvider.getWebClientService()
+                .get()
+                .uri(uri)
+                .header("Authorization", "Bearer " + apiKey)
+                .retrieve()) {
+
+            final InputStream bodyInputStream = response.body();
+            if (SUCCESSFUL_RESPONSE_RANGE.contains(response.statusCode())) {
+                return callback.apply(bodyInputStream);
+            }
+            if (response.statusCode() == TOO_MANY_REQUESTS) {
+                throw new RateLimitExceededException();
+            }
+            final StringBuilder exceptionMessageBuilder = new 
StringBuilder("Error response. Code: " + response.statusCode());
+            final String bodyText = IOUtils.toString(bodyInputStream, 
StandardCharsets.UTF_8);
+            if (bodyText != null) {
+                exceptionMessageBuilder.append(" Body: ").append(bodyText);
+            }
+
+            throw new ProcessException(exceptionMessageBuilder.toString());
+        } catch (IOException e) {
+            throw new RuntimeException(String.format("Airtable HTTP request 
failed [%s]", uri), e);

Review Comment:
   ```suggestion
               throw new ProcessException(String.format("Airtable HTTP request 
failed [%s]", uri), e);
   ```



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult;
+import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import org.apache.nifi.processors.airtable.service.RateLimitExceededException;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster, so this Processor can run only 
on the Primary Node and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous one left off without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Records Per FlowFile' is set then all FlowFiles from the same query result 
set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Records Per FlowFile' is set then this is the total number of "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max 
Records Per FlowFile' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order "
+                + "FlowFiles were produced"),
+})
+@DefaultSettings(yieldDuration = "15 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createLongValidator(0, 100, true))
+            .build();
+
+    static final PropertyDescriptor QUERY_TIME_WINDOW_LAG = new 
PropertyDescriptor.Builder()
+            .name("query-time-window-lag")
+            .displayName("Query Time Window Lag")
+            .description("The amount of lag to add to the query time window. 
Set this property to avoid missing records when the clock of your local 
machines"
+                    + " and Airtable servers' clock are not in sync. Must be 
greater than or equal to 1 second.")
+            .defaultValue("3 s")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_RECORDS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
+            .name("max-records-per-flowfile")
+            .displayName("Max Records Per FlowFile")
+            .description("The maximum number of result records that will be 
included in a single FlowFile. This will allow you to break up very large"
+                    + " result sets into multiple FlowFiles. If the value 
specified is zero, then all records are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Web Client Service Provider to use for Airtable REST 
API requests")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
query.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_KEY,
+            BASE_ID,
+            TABLE_ID,
+            FIELDS,
+            CUSTOM_FILTER,
+            PAGE_SIZE,
+            QUERY_TIME_WINDOW_LAG,
+            MAX_RECORDS_PER_FLOWFILE,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
+    private static final String LAST_RECORD_FETCH_TIME = 
"last_record_fetch_time";
+
+    private volatile AirtableRestService airtableRestService;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final String apiUrl = 
context.getProperty(API_URL).evaluateAttributeExpressions().getValue();
+        final String apiKey = context.getProperty(API_KEY).getValue();
+        final String baseId = 
context.getProperty(BASE_ID).evaluateAttributeExpressions().getValue();
+        final String tableId = 
context.getProperty(TABLE_ID).evaluateAttributeExpressions().getValue();
+        final WebClientServiceProvider webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+        airtableRestService = new 
AirtableRestService(webClientServiceProvider, apiUrl, apiKey, baseId, tableId);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final Integer maxRecordsPerFlowFile = 
context.getProperty(MAX_RECORDS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger();
+        final Long queryTimeWindowLagSeconds = 
context.getProperty(QUERY_TIME_WINDOW_LAG).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
+
+        final StateMap state;
+        try {
+            state = context.getStateManager().getState(Scope.CLUSTER);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to get cluster state", e);
+        }
+
+        final String lastRecordFetchDateTime = 
state.get(LAST_RECORD_FETCH_TIME);
+        final String currentRecordFetchDateTime = OffsetDateTime.now()
+                .minusSeconds(queryTimeWindowLagSeconds)
+                .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+
+        final AirtableGetRecordsParameters getRecordsParameters = 
buildGetRecordsParameters(context, lastRecordFetchDateTime, 
currentRecordFetchDateTime);
+        final AirtableRetrieveTableResult retrieveTableResult;
+        try {
+            final AirtableTableRetriever tableRetriever = new 
AirtableTableRetriever(airtableRestService, getRecordsParameters, 
maxRecordsPerFlowFile);
+            retrieveTableResult = tableRetriever.retrieveAll(session);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to read Airtable records", e);
+        } catch (RateLimitExceededException e) {
+            context.yield();
+            throw new ProcessException("Airtable REST API rate limit exceeded 
while reading records", e);
+        }
+
+        final Map<String, String> newState = new HashMap<>(state.toMap());
+        newState.put(LAST_RECORD_FETCH_TIME, currentRecordFetchDateTime);
+        try {
+            context.getStateManager().setState(newState, Scope.CLUSTER);

Review Comment:
   Please call `ProcessSession.setState()` instead. The `StateManager` returned 
by `context.getStateManager()` is not transactional and the state gets saved 
immediately after `setState()` is called. When records found, the state must be 
saved together with the outgoing FlowFiles when the session ends successfully 
(commit).
   ```suggestion
               session.setState(newState, Scope.CLUSTER);
   ```



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult;
+import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import org.apache.nifi.processors.airtable.service.RateLimitExceededException;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster, so this Processor can run only 
on the Primary Node and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous one left off without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Records Per FlowFile' is set then all FlowFiles from the same query result 
set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Records Per FlowFile' is set then this is the total number of "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max 
Records Per FlowFile' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order "
+                + "FlowFiles were produced"),
+})
+@DefaultSettings(yieldDuration = "15 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createLongValidator(0, 100, true))
+            .build();
+
+    static final PropertyDescriptor QUERY_TIME_WINDOW_LAG = new 
PropertyDescriptor.Builder()
+            .name("query-time-window-lag")
+            .displayName("Query Time Window Lag")
+            .description("The amount of lag to add to the query time window. 
Set this property to avoid missing records when the clock of your local 
machines"
+                    + " and Airtable servers' clock are not in sync. Must be 
greater than or equal to 1 second.")
+            .defaultValue("3 s")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_RECORDS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
+            .name("max-records-per-flowfile")
+            .displayName("Max Records Per FlowFile")
+            .description("The maximum number of result records that will be 
included in a single FlowFile. This will allow you to break up very large"
+                    + " result sets into multiple FlowFiles. If the value 
specified is zero, then all records are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Web Client Service Provider to use for Airtable REST 
API requests")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
query.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_KEY,
+            BASE_ID,
+            TABLE_ID,
+            FIELDS,
+            CUSTOM_FILTER,
+            PAGE_SIZE,
+            QUERY_TIME_WINDOW_LAG,
+            MAX_RECORDS_PER_FLOWFILE,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
+    private static final String LAST_RECORD_FETCH_TIME = 
"last_record_fetch_time";
+
+    private volatile AirtableRestService airtableRestService;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final String apiUrl = 
context.getProperty(API_URL).evaluateAttributeExpressions().getValue();
+        final String apiKey = context.getProperty(API_KEY).getValue();
+        final String baseId = 
context.getProperty(BASE_ID).evaluateAttributeExpressions().getValue();
+        final String tableId = 
context.getProperty(TABLE_ID).evaluateAttributeExpressions().getValue();
+        final WebClientServiceProvider webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+        airtableRestService = new 
AirtableRestService(webClientServiceProvider, apiUrl, apiKey, baseId, tableId);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final Integer maxRecordsPerFlowFile = 
context.getProperty(MAX_RECORDS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger();
+        final Long queryTimeWindowLagSeconds = 
context.getProperty(QUERY_TIME_WINDOW_LAG).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
+
+        final StateMap state;
+        try {
+            state = context.getStateManager().getState(Scope.CLUSTER);

Review Comment:
   ```suggestion
               state = session.getState(Scope.CLUSTER);
   ```



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/AirtableRestService.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable.service;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.Range;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+public class AirtableRestService {
+
+    public static final String API_V0_BASE_URL = "https://api.airtable.com/v0";;
+
+    private static final int TOO_MANY_REQUESTS = 429;
+    private static final Range<Integer> SUCCESSFUL_RESPONSE_RANGE = 
Range.between(200, 299);
+
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String apiUrl;
+    private final String apiKey;
+    private final String baseId;
+    private final String tableId;
+
+    public AirtableRestService(final WebClientServiceProvider 
webClientServiceProvider,
+            final String apiUrl,
+            final String apiKey,
+            final String baseId,
+            final String tableId) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.apiUrl = apiUrl;
+        this.apiKey = apiKey;
+        this.baseId = baseId;
+        this.tableId = tableId;
+    }
+
+    public <R> R getRecords(final AirtableGetRecordsParameters 
getRecordsParameters, final Function<InputStream, R> callback) {
+        final URI uri = buildUri(getRecordsParameters);
+        try (final HttpResponseEntity response = 
webClientServiceProvider.getWebClientService()
+                .get()
+                .uri(uri)
+                .header("Authorization", "Bearer " + apiKey)
+                .retrieve()) {
+
+            final InputStream bodyInputStream = response.body();
+            if (SUCCESSFUL_RESPONSE_RANGE.contains(response.statusCode())) {
+                return callback.apply(bodyInputStream);
+            }
+            if (response.statusCode() == TOO_MANY_REQUESTS) {
+                throw new RateLimitExceededException();
+            }
+            final StringBuilder exceptionMessageBuilder = new 
StringBuilder("Error response. Code: " + response.statusCode());
+            final String bodyText = IOUtils.toString(bodyInputStream, 
StandardCharsets.UTF_8);
+            if (bodyText != null) {
+                exceptionMessageBuilder.append(" Body: ").append(bodyText);
+            }
+
+            throw new ProcessException(exceptionMessageBuilder.toString());
+        } catch (IOException e) {
+            throw new RuntimeException(String.format("Airtable HTTP request 
failed [%s]", uri), e);
+        }
+    }
+
+    public HttpUriBuilder createUriBuilder(final boolean includePort) {
+        final URI uri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = 
webClientServiceProvider.getHttpUriBuilder()
+                .scheme(uri.getScheme())
+                .host(uri.getHost())
+                .encodedPath(uri.getPath())
+                .addPathSegment(baseId)
+                .addPathSegment(tableId);
+        if (includePort && uri.getPort() != -1) {
+            uriBuilder.port(uri.getPort());
+        }
+        return uriBuilder;
+    }
+
+    private URI buildUri(AirtableGetRecordsParameters getRecordsParameters) {
+        final HttpUriBuilder uriBuilder = createUriBuilder(true);
+        for (final String field : getRecordsParameters.getFields()) {
+            uriBuilder.addQueryParameter("fields[]", field);
+        }
+
+        final List<String> filters = new ArrayList<>();
+        getRecordsParameters.getCustomFilter()
+                .ifPresent(filters::add);
+        getRecordsParameters.getModifiedAfter()
+                .map(modifiedAfter -> 
"IS_AFTER(LAST_MODIFIED_TIME(),DATETIME_PARSE(\"" + modifiedAfter + "\"))")
+                .ifPresent(filters::add);
+        getRecordsParameters.getModifiedBefore()
+                .map(modifiedBefore -> 
"IS_BEFORE(LAST_MODIFIED_TIME(),DATETIME_PARSE(\"" + modifiedBefore + "\"))")
+                .ifPresent(filters::add);

Review Comment:
   As far as I can see (please double check it), both `IS_AFTER` and 
`IS_BEFORE` are exclusive but one endpoint of the query time window must be 
inclusive in order not to loose records with Last Modified time equal to the 
time window start/end. The start point should be inclusive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to