exceptionfactory commented on code in PR #6350:
URL: https://github.com/apache/nifi/pull/6350#discussion_r960580468


##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/record/AirtableRecordSet.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable.record;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import 
org.apache.nifi.processors.airtable.service.AirtableRestService.RateLimitExceededException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+public class AirtableRecordSet implements RecordSet, Closeable {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+
+    final AirtableJsonTreeRowRecordReaderFactory recordReaderFactory;
+    final AirtableRestService restService;
+    final AirtableGetRecordsParameters getRecordsParameters;
+    byte[] recordsJson;
+    JsonTreeRowRecordReader reader = null;
+
+    public AirtableRecordSet(final byte[] recordsJson,
+            final AirtableJsonTreeRowRecordReaderFactory recordReaderFactory,
+            final AirtableRestService restService,
+            final AirtableGetRecordsParameters getRecordsParameters) {
+        this.recordReaderFactory = recordReaderFactory;
+        this.restService = restService;
+        this.getRecordsParameters = getRecordsParameters;
+        this.recordsJson = recordsJson;
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return recordReaderFactory.recordSchema;
+    }
+
+    @Override
+    public Record next() throws IOException {
+        if (reader == null) {
+            final ByteArrayInputStream inputStream = new 
ByteArrayInputStream(recordsJson);
+            try {
+                reader = recordReaderFactory.create(inputStream);
+            } catch (MalformedRecordException e) {
+                throw new IOException("Failed to create Airtable record 
reader", e);
+            }
+        }
+        final Record record;
+        try {
+            record = reader.nextRecord();
+        } catch (MalformedRecordException e) {
+            throw new IOException("Failed to read next Airtable record", e);
+        }
+
+        if (record != null) {
+            return record;
+        }
+
+        final Optional<String> offset = readOffsetFromJson();
+        if (offset.isPresent()) {
+            try {
+                recordsJson = 
restService.getRecords(getRecordsParameters.withOffset(offset.get()));
+            } catch (RateLimitExceededException e) {
+                throw new IOException("REST API rate limit exceeded while 
fetching Airtable records", e);
+            }
+            reader = null;
+            return next();
+        }
+
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (reader != null) {
+            reader.close();
+        }
+    }
+
+    private Optional<String> readOffsetFromJson() throws IOException {
+        try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(recordsJson)) {

Review Comment:
   This approach is an improvement over using JSON Path, thanks for making the 
adjustments. However, it still requires parsing the response JSON multiple 
times. A custom Record Reader seems like it will be necessary to read records, 
and also read and store the offset while processing.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java:
##########
@@ -67,7 +67,7 @@
         + "a field that is not present in the schema, that field will be 
skipped. "
         + "See the Usage of the Controller Service for more information and 
examples.")
 @SeeAlso(JsonPathReader.class)
-public class JsonTreeReader extends SchemaRegistryService implements 
RecordReaderFactory {
+public class JsonTreeReader extends SchemaRegistryService implements 
JsonRecordReaderFactory {

Review Comment:
   As mentioned on the extended interface, this change appears unnecessary.



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/JsonRecordReaderFactory.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+public interface JsonRecordReaderFactory extends RecordReaderFactory {
+
+}

Review Comment:
   Is there a particular reason for this extended interface? A format-specific 
interface does not belong in this module, and it seems like it should be 
removed.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/AirtableRestService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable.service;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import javax.ws.rs.core.UriBuilder;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.Range;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.WebClientService;
+
+public class AirtableRestService {
+
+    public static final String API_V0_BASE_URL = "https://api.airtable.com/v0";;
+
+    private static final int TOO_MANY_REQUESTS = 429;
+    private static final Range<Integer> SUCCESSFUL_RESPONSE_RANGE = 
Range.between(200, 299);
+
+    private final WebClientService webClientService;
+    private final String apiUrl;
+    private final String apiKey;
+    private final String baseId;
+    private final String tableId;
+
+    public AirtableRestService(final WebClientService webClientService,
+            final String apiUrl,
+            final String apiKey,
+            final String baseId,
+            final String tableId) {
+        this.webClientService = webClientService;
+        this.apiUrl = apiUrl;
+        this.apiKey = apiKey;
+        this.baseId = baseId;
+        this.tableId = tableId;
+    }
+
+    public byte[] getRecords(final AirtableGetRecordsParameters filter) throws 
RateLimitExceededException {
+        final URI uri = buildUri(filter);
+        try (final HttpResponseEntity response = webClientService.get()
+                .uri(uri)
+                .header("Authorization", "Bearer " + apiKey)
+                .retrieve()) {
+
+            if (SUCCESSFUL_RESPONSE_RANGE.contains(response.statusCode())) {
+                return IOUtils.toByteArray(response.body());
+            }
+            if (response.statusCode() == TOO_MANY_REQUESTS) {
+                throw new RateLimitExceededException();
+            }
+            final StringBuilder exceptionMessageBuilder = new 
StringBuilder("Invalid response. Code: " + response.statusCode());
+            final String bodyText = IOUtils.toString(response.body(), 
StandardCharsets.UTF_8);
+            if (bodyText != null) {
+                exceptionMessageBuilder.append(" Body: ").append(bodyText);
+            }
+
+            throw new ProcessException(exceptionMessageBuilder.toString());
+        } catch (IOException e) {
+            throw new ProcessException(String.format("Airtable HTTP request 
failed [%s]", uri), e);
+        }
+    }
+
+    public static class RateLimitExceededException extends Exception {

Review Comment:
   It seems like this would be better defined as a subclass of 
`RuntimeException`. Making this a separate class, as opposed to a public static 
member, also seems like a better approach.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.record.AirtableRecordSet;
+import 
org.apache.nifi.processors.airtable.record.AirtableJsonTreeRowRecordReaderFactory;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import 
org.apache.nifi.processors.airtable.service.AirtableRestService.RateLimitExceededException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.JsonRecordReaderFactory;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " Schema can be provided by setting up a JsonTreeReader controller 
service properly. This processor is intended to be run on the Primary Node 
only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer."),
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile.")
+})
+@DefaultSettings(yieldDuration = "35 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table Name or ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createLongValidator(0, 100, true))
+            .build();
+
+    static final PropertyDescriptor MAX_RECORDS_PER_FLOW_FILE = new 
PropertyDescriptor.Builder()
+            .name("max-records-per-flow-file")
+            .displayName("Max Records Per Flow File")
+            .description("The maximum number of result records that will be 
included in a single FlowFile. This will allow you to break up very large"
+                    + " result sets into multiple FlowFiles. If the value 
specified is zero, then all records are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor OUTPUT_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("output-batch-size")
+            .displayName("Output Batch Size")
+            .description("The number of output FlowFiles to queue before 
committing the process session. When set to zero, the session will be committed 
when all records"
+                    + " have been processed and the output FlowFiles are ready 
for transfer to the downstream relationship. For large result sets, this can 
cause a large burst of FlowFiles"
+                    + " to be transferred at the end of processor execution. 
If this property is set, then when the specified number of FlowFiles are ready 
for transfer, then the session will"
+                    + " be committed, thus releasing the FlowFiles to the 
downstream relationship.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor METADATA_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("metadata-strategy")
+            .displayName("Metadata Strategy")
+            .description("Strategy to use for fetching record schema. 
Currently only 'Use JSON Record Reader' is supported."
+                    + " When Airtable Metadata API becomes more stable it will 
be possible to fetch the record schema through it.")
+            .required(true)
+            .defaultValue(MetadataStrategy.USE_JSON_RECORD_READER.name())
+            .allowableValues(MetadataStrategy.class)
+            .build();
+
+    static final PropertyDescriptor SCHEMA_READER = new 
PropertyDescriptor.Builder()
+            .name("schema-reader")
+            .displayName("Schema Reader")
+            .description("JsonTreeReader service to use for fetching the 
schema of records returned by the Airtable REST API")
+            .dependsOn(METADATA_STRATEGY, 
MetadataStrategy.USE_JSON_RECORD_READER.name())
+            .identifiesControllerService(JsonRecordReaderFactory.class)

Review Comment:
   Although this interface allows narrowing the supported type, it introduces a 
format-specific interface to the API, which is not ideal.
   
   Is this necessary given the custom `AirtableJsonTreeRowRecordReaderFactory`?



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/record/AirtableJsonTreeRowRecordReaderFactory.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable.record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.json.SchemaApplicationStrategy;
+import org.apache.nifi.json.StartingFieldStrategy;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class AirtableJsonTreeRowRecordReaderFactory {
+
+    private static final String STARTING_FIELD_NAME = "records";
+    private static final String DATE_FORMAT = "yyyy-MM-dd";
+    private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
+    private static final String DATE_TIME_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";

Review Comment:
   Do these formats apply to all records in Airtable? It would be helpful to 
provide a short comment with a link to the standard if available.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/test/java/org/apache/nifi/processors/airtable/QueryAirtableTableIT.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import okhttp3.HttpUrl;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.json.SchemaApplicationStrategy;
+import org.apache.nifi.json.StartingFieldStrategy;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import 
org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class QueryAirtableTableIT {
+
+    public static final String RECORDS_JSON_BODY = "{\"records\":[{"
+            + "\"id\":\"recabcdefghijklmn\","
+            + "\"createdTime\":\"1970-00-01T00:00:00.000Z\","
+            + "\"fields\":{\"foo\":\"bar\"}}]}";
+    public static final String RECORDS_WITH_OFFSET_JSON_BODY = 
"{\"records\":[{"
+            + "\"id\":\"recabcdefghijklmn\","
+            + "\"createdTime\":\"1970-00-01T00:00:00.000Z\","
+            + "\"fields\":{\"foo\":\"bar\"}}],"
+            + "\"offset\":\"ofsabcdefghijklmn\"}";
+    public static final String EXPECTED_RECORD_CONTENT =
+            
"\"recabcdefghijklmn\",\"1970-00-01T00:00:00.000Z\",\"MapRecord[{foo=bar}]\"\n";
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        final Processor queryAirtableTable = new QueryAirtableTable();
+
+        runner = TestRunners.newTestRunner(queryAirtableTable);
+
+        final RecordReaderFactory schemaReader = new JsonTreeReader();
+        runner.addControllerService("reader", schemaReader);
+        runner.setProperty(schemaReader, JsonTreeReader.STARTING_FIELD_NAME, 
"records");
+        runner.setProperty(schemaReader, 
JsonTreeReader.STARTING_FIELD_STRATEGY, 
StartingFieldStrategy.NESTED_FIELD.getValue());
+        runner.setProperty(schemaReader, 
JsonTreeReader.SCHEMA_APPLICATION_STRATEGY, 
SchemaApplicationStrategy.SELECTED_PART.getValue());
+        runner.enableControllerService(schemaReader);
+
+        final RecordSetWriterFactory writer = new MockRecordWriter();
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        final WebClientServiceProvider webClientServiceProvider = new 
StandardWebClientServiceProvider();
+        runner.addControllerService("webClientService", 
webClientServiceProvider);
+        runner.enableControllerService(webClientServiceProvider);
+
+        runner.setProperty(QueryAirtableTable.API_KEY, "???");
+        runner.setProperty(QueryAirtableTable.BASE_ID, "appabcdefghijklmn");
+        runner.setProperty(QueryAirtableTable.TABLE_ID, "tblabcdefghijklmn");

Review Comment:
   Recommend using a placeholder word, or number, instead of random characters. 
Either `UUID.randomUUID().toString()` or just `BASE_ID` and `TABLE_ID`.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/pom.xml:
##########
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>nifi-airtable-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-airtable-processors</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-web-client-provider-api</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>javax.ws.rs-api</artifactId>
+            <version>2.1.1</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-common</artifactId>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   Thanks for the reference. The standard `java.net.URI.create()` method can 
parse a URI string, which seems like it should work in this case, and avoid 
introducing these additional dependencies. If necessary, it may be worth 
extending the `HttpUriBuilder`, but either approach would be better than 
introducing these dependencies just for parsing a URI.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/test/java/org/apache/nifi/processors/airtable/QueryAirtableTableIT.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import okhttp3.HttpUrl;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.json.SchemaApplicationStrategy;
+import org.apache.nifi.json.StartingFieldStrategy;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import 
org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class QueryAirtableTableIT {

Review Comment:
   It looks like this test can run as a unit test, which would provide 
validation as part of automated builds.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.record.AirtableRecordSet;
+import 
org.apache.nifi.processors.airtable.record.AirtableJsonTreeRowRecordReaderFactory;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import 
org.apache.nifi.processors.airtable.service.AirtableRestService.RateLimitExceededException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.JsonRecordReaderFactory;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " Schema can be provided by setting up a JsonTreeReader controller 
service properly. This processor is intended to be run on the Primary Node 
only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer."),
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile.")
+})
+@DefaultSettings(yieldDuration = "35 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table Name or ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createLongValidator(0, 100, true))
+            .build();
+
+    static final PropertyDescriptor MAX_RECORDS_PER_FLOW_FILE = new 
PropertyDescriptor.Builder()
+            .name("max-records-per-flow-file")
+            .displayName("Max Records Per Flow File")
+            .description("The maximum number of result records that will be 
included in a single FlowFile. This will allow you to break up very large"
+                    + " result sets into multiple FlowFiles. If the value 
specified is zero, then all records are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor OUTPUT_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("output-batch-size")
+            .displayName("Output Batch Size")
+            .description("The number of output FlowFiles to queue before 
committing the process session. When set to zero, the session will be committed 
when all records"
+                    + " have been processed and the output FlowFiles are ready 
for transfer to the downstream relationship. For large result sets, this can 
cause a large burst of FlowFiles"
+                    + " to be transferred at the end of processor execution. 
If this property is set, then when the specified number of FlowFiles are ready 
for transfer, then the session will"
+                    + " be committed, thus releasing the FlowFiles to the 
downstream relationship.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor METADATA_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("metadata-strategy")
+            .displayName("Metadata Strategy")
+            .description("Strategy to use for fetching record schema. 
Currently only 'Use JSON Record Reader' is supported."
+                    + " When Airtable Metadata API becomes more stable it will 
be possible to fetch the record schema through it.")
+            .required(true)
+            .defaultValue(MetadataStrategy.USE_JSON_RECORD_READER.name())
+            .allowableValues(MetadataStrategy.class)
+            .build();
+
+    static final PropertyDescriptor SCHEMA_READER = new 
PropertyDescriptor.Builder()
+            .name("schema-reader")
+            .displayName("Schema Reader")
+            .description("JsonTreeReader service to use for fetching the 
schema of records returned by the Airtable REST API")
+            .dependsOn(METADATA_STRATEGY, 
MetadataStrategy.USE_JSON_RECORD_READER.name())
+            .identifiesControllerService(JsonRecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Service used for writing records returned by the 
Airtable REST API")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Web Client Service Provider to use for Airtable REST 
API requests")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
query.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_KEY,
+            BASE_ID,
+            TABLE_ID,
+            FIELDS,
+            CUSTOM_FILTER,
+            PAGE_SIZE,
+            MAX_RECORDS_PER_FLOW_FILE,
+            OUTPUT_BATCH_SIZE,
+            METADATA_STRATEGY,
+            SCHEMA_READER,
+            RECORD_WRITER,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
+    private static final String LAST_RECORD_FETCH_TIME = 
"last_record_fetch_time";
+    private static final int QUERY_LAG_SECONDS = 1;
+
+    private volatile AirtableRestService airtableRestService;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final String apiUrl = 
context.getProperty(API_URL).evaluateAttributeExpressions().getValue();
+        final String apiKey = context.getProperty(API_KEY).getValue();
+        final String baseId = 
context.getProperty(BASE_ID).evaluateAttributeExpressions().getValue();
+        final String tableId = 
context.getProperty(TABLE_ID).evaluateAttributeExpressions().getValue();
+        final WebClientServiceProvider webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+        airtableRestService = new 
AirtableRestService(webClientServiceProvider.getWebClientService(), apiUrl, 
apiKey, baseId, tableId);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final JsonRecordReaderFactory schemaRecordReaderFactory = 
context.getProperty(SCHEMA_READER).asControllerService(JsonRecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Integer maxRecordsPerFlowFile = 
context.getProperty(MAX_RECORDS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final Integer outputBatchSize = 
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+
+        final StateMap state;
+        try {
+            state = context.getStateManager().getState(Scope.CLUSTER);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to get cluster state", e);
+        }
+
+        final String lastRecordFetchDateTime = 
state.get(LAST_RECORD_FETCH_TIME);
+        final String currentRecordFetchDateTime = 
OffsetDateTime.now().minusSeconds(QUERY_LAG_SECONDS).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+
+        final AirtableGetRecordsParameters getRecordsParameters = 
buildGetRecordsParameters(context, lastRecordFetchDateTime, 
currentRecordFetchDateTime);
+        final byte[] recordsJson;
+        try {
+            recordsJson = airtableRestService.getRecords(getRecordsParameters);
+        } catch (RateLimitExceededException e) {
+            context.yield();
+            throw new ProcessException("REST API rate limit exceeded while 
fetching initial Airtable record set", e);
+        }
+
+        final FlowFile flowFile = session.create();
+        final Map<String, String> originalAttributes = 
flowFile.getAttributes();
+
+        final RecordSchema recordSchema;
+        final RecordSchema writerSchema;
+        try {
+            final ByteArrayInputStream recordsStream = new 
ByteArrayInputStream(recordsJson);
+            recordSchema = 
schemaRecordReaderFactory.createRecordReader(flowFile, recordsStream, 
getLogger()).getSchema();
+            writerSchema = writerFactory.getSchema(originalAttributes, 
recordSchema);
+        } catch (MalformedRecordException | IOException | 
SchemaNotFoundException e) {
+            throw new ProcessException("Couldn't get record schema", e);
+        }
+
+        session.remove(flowFile);
+
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        int totalRecordCount = 0;
+        final AirtableJsonTreeRowRecordReaderFactory recordReaderFactory = new 
AirtableJsonTreeRowRecordReaderFactory(getLogger(), recordSchema);
+        try (final AirtableRecordSet airtableRecordSet = new 
AirtableRecordSet(recordsJson, recordReaderFactory, airtableRestService, 
getRecordsParameters)) {
+            while (true) {
+                final AtomicInteger recordCountHolder = new AtomicInteger();
+                final Map<String, String> attributes = new HashMap<>();
+                FlowFile flowFileToAdd = session.create();
+                flowFileToAdd = session.write(flowFileToAdd, out -> {
+                    try (final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), writerSchema, out, originalAttributes)) 
{
+                        final RecordSet recordSet = (maxRecordsPerFlowFile > 0 
? airtableRecordSet.limit(maxRecordsPerFlowFile) : airtableRecordSet);
+                        final WriteResult writeResult = 
writer.write(recordSet);
+
+                        attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+                        attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+                        attributes.putAll(writeResult.getAttributes());
+
+                        recordCountHolder.set(writeResult.getRecordCount());
+                    } catch (final IOException e) {
+                        final Throwable cause = e.getCause();
+                        if (cause instanceof RateLimitExceededException) {
+                            context.yield();
+                            throw new ProcessException("REST API rate limit 
exceeded while reading Airtable records", cause);
+                        }
+                        throw new ProcessException("Couldn't read records from 
input", e);
+                    } catch (final SchemaNotFoundException e) {
+                        throw new ProcessException("Couldn't read records from 
input", e);
+                    }
+                });
+
+                flowFileToAdd = session.putAllAttributes(flowFileToAdd, 
attributes);
+
+                final int recordCount = recordCountHolder.get();
+                if (recordCount > 0) {
+                    totalRecordCount += recordCount;
+                    flowFiles.add(flowFileToAdd);
+                    if (outputBatchSize > 0 && flowFiles.size() == 
outputBatchSize) {
+                        transferFlowFiles(flowFiles, session, 
totalRecordCount);
+                        flowFiles.clear();
+                    }
+                    continue;
+                }
+
+                session.remove(flowFileToAdd);
+
+                if (maxRecordsPerFlowFile > 0 && outputBatchSize == 0) {
+                    fragmentFlowFiles(session, flowFiles);
+                }
+
+                transferFlowFiles(flowFiles, session, totalRecordCount);
+                break;
+            }
+        } catch (final IOException e) {
+            throw new ProcessException("Couldn't read records from input", e);
+        }
+
+        if (totalRecordCount == 0) {
+            return;
+        }
+
+        final Map<String, String> newState = new HashMap<>(state.toMap());
+        newState.put(LAST_RECORD_FETCH_TIME, currentRecordFetchDateTime);
+        try {
+            context.getStateManager().setState(newState, Scope.CLUSTER);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to update cluster state", e);
+        }
+    }
+
+    private AirtableGetRecordsParameters buildGetRecordsParameters(final 
ProcessContext context,
+            final String lastRecordFetchTime,
+            final String nowDateTimeString) {
+        final String fieldsProperty = 
context.getProperty(FIELDS).evaluateAttributeExpressions().getValue();
+        final String customFilter = 
context.getProperty(CUSTOM_FILTER).evaluateAttributeExpressions().getValue();
+        final Integer pageSize = 
context.getProperty(PAGE_SIZE).evaluateAttributeExpressions().asInteger();
+
+        final AirtableGetRecordsParameters.Builder getRecordsParametersBuilder 
= new AirtableGetRecordsParameters.Builder();
+        if (lastRecordFetchTime != null) {
+            getRecordsParametersBuilder
+                    .modifiedAfter(lastRecordFetchTime)
+                    .modifiedBefore(nowDateTimeString);
+        }
+        if (fieldsProperty != null) {
+            
getRecordsParametersBuilder.fields(Arrays.stream(fieldsProperty.split(",")).map(String::trim).collect(Collectors.toList()));
+        }
+        getRecordsParametersBuilder.customFilter(customFilter);
+        if (pageSize != null && pageSize > 0) {
+            getRecordsParametersBuilder.pageSize(pageSize);
+        }
+
+        return getRecordsParametersBuilder.build();
+    }
+
+    private void fragmentFlowFiles(final ProcessSession session, final 
List<FlowFile> flowFiles) {
+        final String fragmentIdentifier = UUID.randomUUID().toString();
+        for (int i = 0; i < flowFiles.size(); i++) {
+            final Map<String, String> fragmentAttributes = new HashMap<>();
+            fragmentAttributes.put(FRAGMENT_ID.key(), fragmentIdentifier);
+            fragmentAttributes.put(FRAGMENT_INDEX.key(), String.valueOf(i));
+            fragmentAttributes.put(FRAGMENT_COUNT.key(), 
String.valueOf(flowFiles.size()));
+
+            flowFiles.set(i, session.putAllAttributes(flowFiles.get(i), 
fragmentAttributes));
+        }
+    }
+
+    private void transferFlowFiles(final List<FlowFile> flowFiles, final 
ProcessSession session, final int totalRecordCount) {
+        session.transfer(flowFiles, REL_SUCCESS);
+        session.adjustCounter("Records Processed", totalRecordCount, false);
+        final String flowFilesAsString = 
flowFiles.stream().map(FlowFile::toString).collect(Collectors.joining(", ", 
"[", "]"));
+        getLogger().info("Successfully written {} records for flow files {}", 
totalRecordCount, flowFilesAsString);

Review Comment:
   Recommend adjusting the wording:
   ```suggestion
           getLogger().info("Transferred FlowFiles [{}] Records [{}]", 
flowFilesAsString, totalRecordCount);
   ```



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/AirtableGetRecordsParameters.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+public class AirtableGetRecordsParameters {
+
+    private final List<String> fields;
+    private final Optional<String> modifiedAfter;
+    private final Optional<String> modifiedBefore;
+    private final Optional<String> customFilter;
+    private final Optional<String> offset;
+    private final OptionalInt pageSize;
+
+    public AirtableGetRecordsParameters(final List<String> fields,
+            final Optional<String> modifiedAfter,
+            final Optional<String> modifiedBefore,
+            final Optional<String> customFilter,
+            final Optional<String> offset,
+            final OptionalInt pageSize) {
+        this.fields = Objects.requireNonNull(fields);
+        this.modifiedAfter = modifiedAfter;
+        this.modifiedBefore = modifiedBefore;
+        this.customFilter = customFilter;
+        this.offset = offset;
+        this.pageSize = pageSize;
+    }
+
+    public List<String> getFields() {
+        return fields;
+    }
+
+    public Optional<String> getModifiedAfter() {
+        return modifiedAfter;
+    }
+
+    public Optional<String> getModifiedBefore() {
+        return modifiedBefore;
+    }
+
+    public Optional<String> getCustomFilter() {
+        return customFilter;
+    }
+
+    public Optional<String> getOffset() {
+        return offset;
+    }
+
+    public OptionalInt getPageSize() {
+        return pageSize;
+    }
+
+    public AirtableGetRecordsParameters withOffset(final String offset) {
+        return new AirtableGetRecordsParameters(fields, modifiedAfter, 
modifiedBefore, customFilter, Optional.of(offset), pageSize);
+    }
+
+    public static class Builder {
+        private List<String> fields = null;
+        private String modifiedAfter = null;
+        private String modifiedBefore = null;
+        private String customFilter = null;
+        private String offset = null;

Review Comment:
   Declaring `null` should be unnecessary, since it is the default value.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.record.AirtableRecordSet;
+import 
org.apache.nifi.processors.airtable.record.AirtableJsonTreeRowRecordReaderFactory;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import 
org.apache.nifi.processors.airtable.service.AirtableRestService.RateLimitExceededException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.JsonRecordReaderFactory;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " Schema can be provided by setting up a JsonTreeReader controller 
service properly. This processor is intended to be run on the Primary Node 
only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer."),
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile.")
+})
+@DefaultSettings(yieldDuration = "35 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table Name or ID")

Review Comment:
   Recommend calling this property `Table ID`, since the description indicates 
that the ID or name is allowed.



##########
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java:
##########
@@ -33,7 +34,7 @@
 import java.util.List;
 import java.util.Map;
 
-public class MockRecordParser extends AbstractControllerService implements 
RecordReaderFactory {
+public class MockRecordParser extends AbstractControllerService implements 
JsonRecordReaderFactory, RecordReaderFactory {

Review Comment:
   The `MockRecordReader` does not have any JSON-specific elements, so this 
change should be reverted.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/test/java/org/apache/nifi/processors/airtable/QueryAirtableTableIT.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import okhttp3.HttpUrl;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.json.SchemaApplicationStrategy;
+import org.apache.nifi.json.StartingFieldStrategy;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import 
org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class QueryAirtableTableIT {
+
+    public static final String RECORDS_JSON_BODY = "{\"records\":[{"
+            + "\"id\":\"recabcdefghijklmn\","
+            + "\"createdTime\":\"1970-00-01T00:00:00.000Z\","
+            + "\"fields\":{\"foo\":\"bar\"}}]}";
+    public static final String RECORDS_WITH_OFFSET_JSON_BODY = 
"{\"records\":[{"
+            + "\"id\":\"recabcdefghijklmn\","
+            + "\"createdTime\":\"1970-00-01T00:00:00.000Z\","
+            + "\"fields\":{\"foo\":\"bar\"}}],"
+            + "\"offset\":\"ofsabcdefghijklmn\"}";
+    public static final String EXPECTED_RECORD_CONTENT =
+            
"\"recabcdefghijklmn\",\"1970-00-01T00:00:00.000Z\",\"MapRecord[{foo=bar}]\"\n";
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        final Processor queryAirtableTable = new QueryAirtableTable();
+
+        runner = TestRunners.newTestRunner(queryAirtableTable);
+
+        final RecordReaderFactory schemaReader = new JsonTreeReader();
+        runner.addControllerService("reader", schemaReader);
+        runner.setProperty(schemaReader, JsonTreeReader.STARTING_FIELD_NAME, 
"records");
+        runner.setProperty(schemaReader, 
JsonTreeReader.STARTING_FIELD_STRATEGY, 
StartingFieldStrategy.NESTED_FIELD.getValue());
+        runner.setProperty(schemaReader, 
JsonTreeReader.SCHEMA_APPLICATION_STRATEGY, 
SchemaApplicationStrategy.SELECTED_PART.getValue());
+        runner.enableControllerService(schemaReader);
+
+        final RecordSetWriterFactory writer = new MockRecordWriter();
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        final WebClientServiceProvider webClientServiceProvider = new 
StandardWebClientServiceProvider();
+        runner.addControllerService("webClientService", 
webClientServiceProvider);
+        runner.enableControllerService(webClientServiceProvider);
+
+        runner.setProperty(QueryAirtableTable.API_KEY, "???");
+        runner.setProperty(QueryAirtableTable.BASE_ID, "appabcdefghijklmn");
+        runner.setProperty(QueryAirtableTable.TABLE_ID, "tblabcdefghijklmn");
+        runner.setProperty(QueryAirtableTable.SCHEMA_READER, 
schemaReader.getIdentifier());
+        runner.setProperty(QueryAirtableTable.RECORD_WRITER, 
writer.getIdentifier());
+        runner.setProperty(QueryAirtableTable.WEB_CLIENT_SERVICE_PROVIDER, 
webClientServiceProvider.getIdentifier());
+
+    }
+
+    @AfterEach
+    void tearDown() {
+        runner.shutdown();
+    }
+
+    @Test
+    void retrievesAndWritesRecords() throws Exception {
+        try (final MockWebServer server = new MockWebServer()) {
+            server.enqueue(new MockResponse().setBody(RECORDS_JSON_BODY));
+
+            server.start();
+            final HttpUrl httpUrl = server.url("/v0/airtable");
+
+            runner.setProperty(QueryAirtableTable.API_URL, httpUrl.toString());
+            runner.run();
+
+            final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(QueryAirtableTable.REL_SUCCESS);
+            assertEquals(1, results.size());
+            final MockFlowFile flowFile = results.get(0);
+            assertEquals("1", flowFile.getAttribute("record.count"));
+            final String content = flowFile.getContent();
+            assertEquals(EXPECTED_RECORD_CONTENT, content);
+        }
+    }
+
+    @Test
+    void retrievesAndWritesPagedRecords() throws Exception {
+        try (final MockWebServer server = new MockWebServer()) {
+            server.enqueue(new 
MockResponse().setBody(RECORDS_WITH_OFFSET_JSON_BODY));
+            server.enqueue(new MockResponse().setBody(RECORDS_JSON_BODY));
+
+            server.start();
+            final HttpUrl httpUrl = server.url("/v0/airtable");
+
+            runner.setProperty(QueryAirtableTable.API_URL, httpUrl.toString());
+            runner.run();
+
+            final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(QueryAirtableTable.REL_SUCCESS);
+            assertEquals(1, results.size());
+            final MockFlowFile flowFile = results.get(0);
+            assertEquals("2", flowFile.getAttribute("record.count"));
+            final String content = flowFile.getContent();
+            assertEquals(EXPECTED_RECORD_CONTENT + EXPECTED_RECORD_CONTENT, 
content);
+        }
+    }
+
+    @Test
+    void retrievesAndWritesPagedRecordsInMultipleFlowFiles() throws Exception {
+        try (final MockWebServer server = new MockWebServer()) {
+            server.enqueue(new 
MockResponse().setBody(RECORDS_WITH_OFFSET_JSON_BODY));
+            server.enqueue(new MockResponse().setBody(RECORDS_JSON_BODY));
+
+            server.start();
+            final HttpUrl httpUrl = server.url("/v0/airtable");

Review Comment:
   Recommend declaring `/v0/airtable` as a static member variable and reusing 
across all test methods.



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.record.AirtableRecordSet;
+import 
org.apache.nifi.processors.airtable.record.AirtableJsonTreeRowRecordReaderFactory;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import 
org.apache.nifi.processors.airtable.service.AirtableRestService.RateLimitExceededException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.JsonRecordReaderFactory;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " Schema can be provided by setting up a JsonTreeReader controller 
service properly. This processor is intended to be run on the Primary Node 
only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer."),
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile.")
+})
+@DefaultSettings(yieldDuration = "35 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table Name or ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createLongValidator(0, 100, true))
+            .build();
+
+    static final PropertyDescriptor MAX_RECORDS_PER_FLOW_FILE = new 
PropertyDescriptor.Builder()
+            .name("max-records-per-flow-file")
+            .displayName("Max Records Per Flow File")

Review Comment:
   ```suggestion
               .displayName("Max Records Per FlowFile")
   ```



##########
nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.airtable;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static 
org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.airtable.record.AirtableRecordSet;
+import 
org.apache.nifi.processors.airtable.record.AirtableJsonTreeRowRecordReaderFactory;
+import 
org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
+import org.apache.nifi.processors.airtable.service.AirtableRestService;
+import 
org.apache.nifi.processors.airtable.service.AirtableRestService.RateLimitExceededException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.JsonRecordReaderFactory;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+@PrimaryNodeOnly
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"airtable", "query", "database"})
+@CapabilityDescription("Query records from an Airtable table. Records are 
incrementally retrieved based on the last modified time of the records."
+        + " Records can also be further filtered by setting the 'Custom 
Filter' property which supports the formulas provided by the Airtable API."
+        + " Schema can be provided by setting up a JsonTreeReader controller 
service properly. This processor is intended to be run on the Primary Node 
only.")
+@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's 
time is stored in order to enable incremental loading."
+        + " The initial query returns all the records in the table and each 
subsequent query filters the records by their last modified time."
+        + " In other words, if a record is updated after the last successful 
query only the updated records will be returned in the next query."
+        + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without 
duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer."),
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile.")
+})
+@DefaultSettings(yieldDuration = "35 sec")
+public class QueryAirtableTable extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("The URL for the Airtable REST API including the 
domain and the path to the API (e.g. https://api.airtable.com/v0).")
+            .defaultValue(API_V0_BASE_URL)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("The REST API key to use in queries. Should be 
generated on Airtable's account page.")
+            .required(true)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder()
+            .name("base-id")
+            .displayName("Base ID")
+            .description("The ID of the Airtable base to be queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder()
+            .name("table-id")
+            .displayName("Table Name or ID")
+            .description("The name or the ID of the Airtable table to be 
queried.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fields")
+            .displayName("Fields")
+            .description("Comma-separated list of fields to query from the 
table. Both the field's name and ID can be used.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_FILTER = new 
PropertyDescriptor.Builder()
+            .name("custom-filter")
+            .displayName("Custom Filter")
+            .description("Filter records by Airtable's formulas.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("Number of records to be fetched in a page. Should be 
between 0 and 100 inclusively.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createLongValidator(0, 100, true))
+            .build();
+
+    static final PropertyDescriptor MAX_RECORDS_PER_FLOW_FILE = new 
PropertyDescriptor.Builder()
+            .name("max-records-per-flow-file")
+            .displayName("Max Records Per Flow File")
+            .description("The maximum number of result records that will be 
included in a single FlowFile. This will allow you to break up very large"
+                    + " result sets into multiple FlowFiles. If the value 
specified is zero, then all records are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor OUTPUT_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("output-batch-size")
+            .displayName("Output Batch Size")
+            .description("The number of output FlowFiles to queue before 
committing the process session. When set to zero, the session will be committed 
when all records"
+                    + " have been processed and the output FlowFiles are ready 
for transfer to the downstream relationship. For large result sets, this can 
cause a large burst of FlowFiles"
+                    + " to be transferred at the end of processor execution. 
If this property is set, then when the specified number of FlowFiles are ready 
for transfer, then the session will"
+                    + " be committed, thus releasing the FlowFiles to the 
downstream relationship.")
+            .defaultValue("0")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor METADATA_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("metadata-strategy")
+            .displayName("Metadata Strategy")
+            .description("Strategy to use for fetching record schema. 
Currently only 'Use JSON Record Reader' is supported."
+                    + " When Airtable Metadata API becomes more stable it will 
be possible to fetch the record schema through it.")
+            .required(true)
+            .defaultValue(MetadataStrategy.USE_JSON_RECORD_READER.name())
+            .allowableValues(MetadataStrategy.class)
+            .build();
+
+    static final PropertyDescriptor SCHEMA_READER = new 
PropertyDescriptor.Builder()
+            .name("schema-reader")
+            .displayName("Schema Reader")
+            .description("JsonTreeReader service to use for fetching the 
schema of records returned by the Airtable REST API")
+            .dependsOn(METADATA_STRATEGY, 
MetadataStrategy.USE_JSON_RECORD_READER.name())
+            .identifiesControllerService(JsonRecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Service used for writing records returned by the 
Airtable REST API")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-client-service-provider")
+            .displayName("Web Client Service Provider")
+            .description("Web Client Service Provider to use for Airtable REST 
API requests")
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful 
query.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_KEY,
+            BASE_ID,
+            TABLE_ID,
+            FIELDS,
+            CUSTOM_FILTER,
+            PAGE_SIZE,
+            MAX_RECORDS_PER_FLOW_FILE,
+            OUTPUT_BATCH_SIZE,
+            METADATA_STRATEGY,
+            SCHEMA_READER,
+            RECORD_WRITER,
+            WEB_CLIENT_SERVICE_PROVIDER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
+    private static final String LAST_RECORD_FETCH_TIME = 
"last_record_fetch_time";
+    private static final int QUERY_LAG_SECONDS = 1;
+
+    private volatile AirtableRestService airtableRestService;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final String apiUrl = 
context.getProperty(API_URL).evaluateAttributeExpressions().getValue();
+        final String apiKey = context.getProperty(API_KEY).getValue();
+        final String baseId = 
context.getProperty(BASE_ID).evaluateAttributeExpressions().getValue();
+        final String tableId = 
context.getProperty(TABLE_ID).evaluateAttributeExpressions().getValue();
+        final WebClientServiceProvider webClientServiceProvider = 
context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
+        airtableRestService = new 
AirtableRestService(webClientServiceProvider.getWebClientService(), apiUrl, 
apiKey, baseId, tableId);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final JsonRecordReaderFactory schemaRecordReaderFactory = 
context.getProperty(SCHEMA_READER).asControllerService(JsonRecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Integer maxRecordsPerFlowFile = 
context.getProperty(MAX_RECORDS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final Integer outputBatchSize = 
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+
+        final StateMap state;
+        try {
+            state = context.getStateManager().getState(Scope.CLUSTER);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to get cluster state", e);
+        }
+
+        final String lastRecordFetchDateTime = 
state.get(LAST_RECORD_FETCH_TIME);
+        final String currentRecordFetchDateTime = 
OffsetDateTime.now().minusSeconds(QUERY_LAG_SECONDS).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+
+        final AirtableGetRecordsParameters getRecordsParameters = 
buildGetRecordsParameters(context, lastRecordFetchDateTime, 
currentRecordFetchDateTime);
+        final byte[] recordsJson;
+        try {
+            recordsJson = airtableRestService.getRecords(getRecordsParameters);
+        } catch (RateLimitExceededException e) {
+            context.yield();
+            throw new ProcessException("REST API rate limit exceeded while 
fetching initial Airtable record set", e);
+        }
+
+        final FlowFile flowFile = session.create();
+        final Map<String, String> originalAttributes = 
flowFile.getAttributes();
+
+        final RecordSchema recordSchema;
+        final RecordSchema writerSchema;
+        try {
+            final ByteArrayInputStream recordsStream = new 
ByteArrayInputStream(recordsJson);
+            recordSchema = 
schemaRecordReaderFactory.createRecordReader(flowFile, recordsStream, 
getLogger()).getSchema();
+            writerSchema = writerFactory.getSchema(originalAttributes, 
recordSchema);
+        } catch (MalformedRecordException | IOException | 
SchemaNotFoundException e) {
+            throw new ProcessException("Couldn't get record schema", e);
+        }
+
+        session.remove(flowFile);
+
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        int totalRecordCount = 0;
+        final AirtableJsonTreeRowRecordReaderFactory recordReaderFactory = new 
AirtableJsonTreeRowRecordReaderFactory(getLogger(), recordSchema);
+        try (final AirtableRecordSet airtableRecordSet = new 
AirtableRecordSet(recordsJson, recordReaderFactory, airtableRestService, 
getRecordsParameters)) {

Review Comment:
   It looks like this approach needs to be re-evaluated. Reading the JSON byte 
array to determine the schema, and then reading it again to process the records 
is not ideal for memory or performance. If it is not possible to infer the 
schema after parsing the first record using existing components, then it would 
be worth evaluating a new implementation that is capable of performing both 
actions without requiring multiple reads. This approach should also allow 
reading the JSON as a stream, instead of a byte array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to