turcsanyip commented on code in PR #6350: URL: https://github.com/apache/nifi/pull/6350#discussion_r966180622
########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable; + +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult; +import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever; +import org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters; +import org.apache.nifi.processors.airtable.service.AirtableRestService; +import org.apache.nifi.processors.airtable.service.RateLimitExceededException; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +@PrimaryNodeOnly +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@TriggerSerially +@TriggerWhenEmpty +@Tags({"airtable", "query", "database"}) +@CapabilityDescription("Query records from an Airtable table. Records are incrementally retrieved based on the last modified time of the records." + + " Records can also be further filtered by setting the 'Custom Filter' property which supports the formulas provided by the Airtable API." + + " Schema can be provided by setting up a JsonTreeReader controller service properly. This processor is intended to be run on the Primary Node only.") Review Comment: ```suggestion + " This processor is intended to be run on the Primary Node only.") ``` ########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/resources/docs/org.apache.nifi.processors.airtable.QueryAirtableTable/additionalDetails.html: ########## @@ -0,0 +1,68 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<!DOCTYPE html> +<html lang="en" xmlns="http://www.w3.org/1999/html"> +<head> + <meta charset="utf-8"/> + <title>QueryAirtableTable</title> + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/> + <style> +h2 {margin-top: 4em} +h3 {margin-top: 3em} +td {text-align: left} + </style> +</head> + +<body> + +<h1>QueryAirtableTable</h1> Review Comment: The Additional Details documentation contains a couple of references to record processing. Please remove/rephrase them. ########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable; + +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult; +import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever; +import org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters; +import org.apache.nifi.processors.airtable.service.AirtableRestService; +import org.apache.nifi.processors.airtable.service.RateLimitExceededException; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +@PrimaryNodeOnly +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@TriggerSerially +@TriggerWhenEmpty +@Tags({"airtable", "query", "database"}) +@CapabilityDescription("Query records from an Airtable table. Records are incrementally retrieved based on the last modified time of the records." + + " Records can also be further filtered by setting the 'Custom Filter' property which supports the formulas provided by the Airtable API." + + " Schema can be provided by setting up a JsonTreeReader controller service properly. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's time is stored in order to enable incremental loading." + + " The initial query returns all the records in the table and each subsequent query filters the records by their last modified time." + + " In other words, if a record is updated after the last successful query only the updated records will be returned in the next query." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile."), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Records Per Flow File' is set then all FlowFiles from the same query result set " Review Comment: 'Max Records Per Flow File' property has been renamed to 'Max Records Per FlowFile'. Please adjust `@WritesAttribute` tags accordingly. ########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/pom.xml: ########## @@ -0,0 +1,107 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>nifi-airtable-bundle</artifactId> + <groupId>org.apache.nifi</groupId> + <version>1.18.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>nifi-airtable-processors</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + <version>1.18.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-services</artifactId> + <version>1.18.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> Review Comment: These dependencies are not used anymore because record processing has been reverted. Also `nifi-mock-record-utils` and `nifi-schema-registry-service-api` test dependencies below. ########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable; + +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult; +import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever; +import org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters; +import org.apache.nifi.processors.airtable.service.AirtableRestService; +import org.apache.nifi.processors.airtable.service.RateLimitExceededException; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +@PrimaryNodeOnly +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@TriggerSerially +@TriggerWhenEmpty +@Tags({"airtable", "query", "database"}) +@CapabilityDescription("Query records from an Airtable table. Records are incrementally retrieved based on the last modified time of the records." + + " Records can also be further filtered by setting the 'Custom Filter' property which supports the formulas provided by the Airtable API." + + " Schema can be provided by setting up a JsonTreeReader controller service properly. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's time is stored in order to enable incremental loading." + + " The initial query returns all the records in the table and each subsequent query filters the records by their last modified time." + + " In other words, if a record is updated after the last successful query only the updated records will be returned in the next query." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile."), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Records Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Records Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), Review Comment: 'Output Batch Size' is not used in this processor so the reference should be removed. ########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable; + +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult; +import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever; +import org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters; +import org.apache.nifi.processors.airtable.service.AirtableRestService; +import org.apache.nifi.processors.airtable.service.RateLimitExceededException; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +@PrimaryNodeOnly +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@TriggerSerially +@TriggerWhenEmpty +@Tags({"airtable", "query", "database"}) +@CapabilityDescription("Query records from an Airtable table. Records are incrementally retrieved based on the last modified time of the records." + + " Records can also be further filtered by setting the 'Custom Filter' property which supports the formulas provided by the Airtable API." + + " Schema can be provided by setting up a JsonTreeReader controller service properly. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's time is stored in order to enable incremental loading." + + " The initial query returns all the records in the table and each subsequent query filters the records by their last modified time." + + " In other words, if a record is updated after the last successful query only the updated records will be returned in the next query." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile."), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Records Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Records Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), + @WritesAttribute(attribute = "fragment.index", description = "If 'Max Records Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " Review Comment: ```suggestion + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " ``` ########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableTableRetriever.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable.parse; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator.Feature; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters; +import org.apache.nifi.processors.airtable.service.AirtableRestService; + +public class AirtableTableRetriever { + + static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory() + .configure(Feature.AUTO_CLOSE_JSON_CONTENT, false); + + final AirtableRestService airtableRestService; + final AirtableGetRecordsParameters getRecordsParameters; + final Integer maxRecordsPerFlowFile; + + public AirtableTableRetriever(final AirtableRestService airtableRestService, + final AirtableGetRecordsParameters getRecordsParameters, + final Integer maxRecordsPerFlowFile) { + this.airtableRestService = airtableRestService; + this.getRecordsParameters = getRecordsParameters; + this.maxRecordsPerFlowFile = maxRecordsPerFlowFile; + } + + public AirtableRetrieveTableResult retrieveAll(final ProcessSession session) throws IOException { + int totalRecordCount = 0; + final List<FlowFile> flowFiles = new ArrayList<>(); + AirtableRetrievePageResult retrievePageResult = null; + do { + retrievePageResult = retrieveNextPage(session, Optional.ofNullable(retrievePageResult)); + totalRecordCount += retrievePageResult.getParsedRecordCount(); + flowFiles.addAll(retrievePageResult.getFlowFiles()); + } while (retrievePageResult.getNextOffset().isPresent()); + + retrievePageResult.getOngoingRecordSetFlowFileWriter() + .map(writer -> { + try { + return writer.closeRecordSet(session); + } catch (IOException e) { + throw new ProcessException("Failed to close Airtable record writer", e); + } + }) + .ifPresent(flowFiles::add); + return new AirtableRetrieveTableResult(flowFiles, totalRecordCount); + } + + private AirtableRetrievePageResult retrieveNextPage(final ProcessSession session, final Optional<AirtableRetrievePageResult> previousPagePageResult) Review Comment: Possible typo: `previousPageResult` ? ########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable; + +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult; +import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever; +import org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters; +import org.apache.nifi.processors.airtable.service.AirtableRestService; +import org.apache.nifi.processors.airtable.service.RateLimitExceededException; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +@PrimaryNodeOnly +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@TriggerSerially +@TriggerWhenEmpty +@Tags({"airtable", "query", "database"}) +@CapabilityDescription("Query records from an Airtable table. Records are incrementally retrieved based on the last modified time of the records." + + " Records can also be further filtered by setting the 'Custom Filter' property which supports the formulas provided by the Airtable API." + + " Schema can be provided by setting up a JsonTreeReader controller service properly. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's time is stored in order to enable incremental loading." + + " The initial query returns all the records in the table and each subsequent query filters the records by their last modified time." + + " In other words, if a record is updated after the last successful query only the updated records will be returned in the next query." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile."), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Records Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Records Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), + @WritesAttribute(attribute = "fragment.index", description = "If 'Max Records Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced"), +}) +@DefaultSettings(yieldDuration = "35 sec") +public class QueryAirtableTable extends AbstractProcessor { + + static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder() + .name("api-url") + .displayName("API URL") + .description("The URL for the Airtable REST API including the domain and the path to the API (e.g. https://api.airtable.com/v0).") + .defaultValue(API_V0_BASE_URL) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder() + .name("api-key") + .displayName("API Key") + .description("The REST API key to use in queries. Should be generated on Airtable's account page.") + .required(true) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder() + .name("base-id") + .displayName("Base ID") + .description("The ID of the Airtable base to be queried.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder() + .name("table-id") + .displayName("Table ID") + .description("The name or the ID of the Airtable table to be queried.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder() + .name("fields") + .displayName("Fields") + .description("Comma-separated list of fields to query from the table. Both the field's name and ID can be used.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor CUSTOM_FILTER = new PropertyDescriptor.Builder() + .name("custom-filter") + .displayName("Custom Filter") + .description("Filter records by Airtable's formulas.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder() + .name("page-size") + .displayName("Page Size") + .description("Number of records to be fetched in a page. Should be between 0 and 100 inclusively.") + .defaultValue("0") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.createLongValidator(0, 100, true)) + .build(); + + static final PropertyDescriptor MAX_RECORDS_PER_FLOW_FILE = new PropertyDescriptor.Builder() + .name("max-records-per-flow-file") + .displayName("Max Records Per FlowFile") + .description("The maximum number of result records that will be included in a single FlowFile. This will allow you to break up very large" + + " result sets into multiple FlowFiles. If the value specified is zero, then all records are returned in a single FlowFile.") + .defaultValue("0") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder() + .name("web-client-service-provider") + .displayName("Web Client Service Provider") + .description("Web Client Service Provider to use for Airtable REST API requests") + .identifiesControllerService(WebClientServiceProvider.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful query.") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + API_URL, + API_KEY, + BASE_ID, + TABLE_ID, + FIELDS, + CUSTOM_FILTER, + PAGE_SIZE, + MAX_RECORDS_PER_FLOW_FILE, + WEB_CLIENT_SERVICE_PROVIDER + )); + + private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS); + + private static final String LAST_RECORD_FETCH_TIME = "last_record_fetch_time"; + private static final int QUERY_LAG_SECONDS = 1; + + private volatile AirtableRestService airtableRestService; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + final String apiUrl = context.getProperty(API_URL).evaluateAttributeExpressions().getValue(); + final String apiKey = context.getProperty(API_KEY).getValue(); + final String baseId = context.getProperty(BASE_ID).evaluateAttributeExpressions().getValue(); + final String tableId = context.getProperty(TABLE_ID).evaluateAttributeExpressions().getValue(); + final WebClientServiceProvider webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + airtableRestService = new AirtableRestService(webClientServiceProvider, apiUrl, apiKey, baseId, tableId); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final Integer maxRecordsPerFlowFile = context.getProperty(MAX_RECORDS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); + + final StateMap state; + try { + state = context.getStateManager().getState(Scope.CLUSTER); + } catch (IOException e) { + throw new ProcessException("Failed to get cluster state", e); + } + + final String lastRecordFetchDateTime = state.get(LAST_RECORD_FETCH_TIME); + final String currentRecordFetchDateTime = OffsetDateTime.now() + .minusSeconds(QUERY_LAG_SECONDS) + .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); + + final AirtableGetRecordsParameters getRecordsParameters = buildGetRecordsParameters(context, lastRecordFetchDateTime, currentRecordFetchDateTime); + final AirtableRetrieveTableResult retrieveTableResult; + try { + final AirtableTableRetriever tableRetriever = new AirtableTableRetriever(airtableRestService, getRecordsParameters, maxRecordsPerFlowFile); + retrieveTableResult = tableRetriever.retrieveAll(session); + } catch (IOException e) { + throw new ProcessException("Failed to read Airtable records", e); + } catch (RateLimitExceededException e) { + context.yield(); + throw new ProcessException("Airtable REST API rate limit exceeded while reading records", e); + } + + final List<FlowFile> flowFiles = retrieveTableResult.getFlowFiles(); + if (flowFiles.isEmpty()) { + return; + } + + if (maxRecordsPerFlowFile != null && maxRecordsPerFlowFile > 0) { + fragmentFlowFiles(session, flowFiles); + } + transferFlowFiles(session, flowFiles, retrieveTableResult.getTotalRecordCount()); + + final Map<String, String> newState = new HashMap<>(state.toMap()); + newState.put(LAST_RECORD_FETCH_TIME, currentRecordFetchDateTime); + try { + context.getStateManager().setState(newState, Scope.CLUSTER); + } catch (IOException e) { + throw new ProcessException("Failed to update cluster state", e); + } + } + + private AirtableGetRecordsParameters buildGetRecordsParameters(final ProcessContext context, + final String lastRecordFetchTime, + final String nowDateTimeString) { + final String fieldsProperty = context.getProperty(FIELDS).evaluateAttributeExpressions().getValue(); + final String customFilter = context.getProperty(CUSTOM_FILTER).evaluateAttributeExpressions().getValue(); + final Integer pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions().asInteger(); + + final AirtableGetRecordsParameters.Builder getRecordsParametersBuilder = new AirtableGetRecordsParameters.Builder(); + if (lastRecordFetchTime != null) { + getRecordsParametersBuilder + .modifiedAfter(lastRecordFetchTime) + .modifiedBefore(nowDateTimeString); + } + if (fieldsProperty != null) { + getRecordsParametersBuilder.fields(Arrays.stream(fieldsProperty.split(",")).map(String::trim).collect(Collectors.toList())); + } + getRecordsParametersBuilder.customFilter(customFilter); + if (pageSize != null && pageSize > 0) { + getRecordsParametersBuilder.pageSize(pageSize); + } + + return getRecordsParametersBuilder.build(); + } + + private void fragmentFlowFiles(final ProcessSession session, final List<FlowFile> flowFiles) { + final String fragmentIdentifier = UUID.randomUUID().toString(); + for (int i = 0; i < flowFiles.size(); i++) { + final Map<String, String> fragmentAttributes = new HashMap<>(); + fragmentAttributes.put(FRAGMENT_ID.key(), fragmentIdentifier); + fragmentAttributes.put(FRAGMENT_INDEX.key(), String.valueOf(i)); + fragmentAttributes.put(FRAGMENT_COUNT.key(), String.valueOf(flowFiles.size())); + + flowFiles.set(i, session.putAllAttributes(flowFiles.get(i), fragmentAttributes)); + } + } + + private void transferFlowFiles(final ProcessSession session, final List<FlowFile> flowFiles, final int totalRecordCount) { + session.transfer(flowFiles, REL_SUCCESS); + session.adjustCounter("Records Processed", totalRecordCount, false); + final String flowFilesAsString = flowFiles.stream().map(FlowFile::toString).collect(Collectors.joining(", ", "[", "]")); + getLogger().info("Transferred FlowFiles [{}] Records [{}]", flowFilesAsString, totalRecordCount); Review Comment: I would use `debug` log here. ########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/AirtableRestService.java: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable.service; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.Range; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +public class AirtableRestService { + + public static final String API_V0_BASE_URL = "https://api.airtable.com/v0"; + + private static final int TOO_MANY_REQUESTS = 429; + private static final Range<Integer> SUCCESSFUL_RESPONSE_RANGE = Range.between(200, 299); + + private final WebClientServiceProvider webClientServiceProvider; + private final String apiUrl; + private final String apiKey; + private final String baseId; + private final String tableId; + + public AirtableRestService(final WebClientServiceProvider webClientServiceProvider, + final String apiUrl, + final String apiKey, + final String baseId, + final String tableId) { + this.webClientServiceProvider = webClientServiceProvider; + this.apiUrl = apiUrl; + this.apiKey = apiKey; + this.baseId = baseId; + this.tableId = tableId; + } + + public InputStream getRecords(final AirtableGetRecordsParameters filter) throws RateLimitExceededException { + final URI uri = buildUri(filter); + final HttpResponseEntity response = webClientServiceProvider.getWebClientService() + .get() + .uri(uri) + .header("Authorization", "Bearer " + apiKey) + .retrieve(); + + try { + if (SUCCESSFUL_RESPONSE_RANGE.contains(response.statusCode())) { + return response.body(); + } + if (response.statusCode() == TOO_MANY_REQUESTS) { + throw new RateLimitExceededException(); + } + final StringBuilder exceptionMessageBuilder = new StringBuilder("Invalid response. Code: " + response.statusCode()); Review Comment: "Invalid response" is not the right message because when you get e.g. 401 due to wrong credentials, that is a valid response from the server. "Error response" or similar may be better. ########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableRecordSetFlowFileWriter.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable.parse; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import java.io.IOException; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessSession; + +public class AirtableRecordSetFlowFileWriter { + + private final FlowFile flowFile; + private final JsonGenerator jsonGenerator; + private int recordCount = 0; + + private AirtableRecordSetFlowFileWriter(final FlowFile flowFile, final JsonGenerator jsonGenerator) { + this.flowFile = flowFile; + this.jsonGenerator = jsonGenerator; + } + + public static AirtableRecordSetFlowFileWriter startRecordSet(final ProcessSession session) throws + IOException { Review Comment: ```suggestion public static AirtableRecordSetFlowFileWriter startRecordSet(final ProcessSession session) throws IOException { ``` ########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable; + +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult; +import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever; +import org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters; +import org.apache.nifi.processors.airtable.service.AirtableRestService; +import org.apache.nifi.processors.airtable.service.RateLimitExceededException; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +@PrimaryNodeOnly +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@TriggerSerially +@TriggerWhenEmpty +@Tags({"airtable", "query", "database"}) +@CapabilityDescription("Query records from an Airtable table. Records are incrementally retrieved based on the last modified time of the records." + + " Records can also be further filtered by setting the 'Custom Filter' property which supports the formulas provided by the Airtable API." + + " Schema can be provided by setting up a JsonTreeReader controller service properly. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's time is stored in order to enable incremental loading." + + " The initial query returns all the records in the table and each subsequent query filters the records by their last modified time." + + " In other words, if a record is updated after the last successful query only the updated records will be returned in the next query." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile."), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Records Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Records Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), + @WritesAttribute(attribute = "fragment.index", description = "If 'Max Records Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced"), +}) +@DefaultSettings(yieldDuration = "35 sec") +public class QueryAirtableTable extends AbstractProcessor { + + static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder() + .name("api-url") + .displayName("API URL") + .description("The URL for the Airtable REST API including the domain and the path to the API (e.g. https://api.airtable.com/v0).") + .defaultValue(API_V0_BASE_URL) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder() + .name("api-key") + .displayName("API Key") + .description("The REST API key to use in queries. Should be generated on Airtable's account page.") + .required(true) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder() + .name("base-id") + .displayName("Base ID") + .description("The ID of the Airtable base to be queried.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder() + .name("table-id") + .displayName("Table ID") + .description("The name or the ID of the Airtable table to be queried.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder() + .name("fields") + .displayName("Fields") + .description("Comma-separated list of fields to query from the table. Both the field's name and ID can be used.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor CUSTOM_FILTER = new PropertyDescriptor.Builder() + .name("custom-filter") + .displayName("Custom Filter") + .description("Filter records by Airtable's formulas.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder() + .name("page-size") + .displayName("Page Size") + .description("Number of records to be fetched in a page. Should be between 0 and 100 inclusively.") + .defaultValue("0") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.createLongValidator(0, 100, true)) + .build(); + + static final PropertyDescriptor MAX_RECORDS_PER_FLOW_FILE = new PropertyDescriptor.Builder() + .name("max-records-per-flow-file") + .displayName("Max Records Per FlowFile") + .description("The maximum number of result records that will be included in a single FlowFile. This will allow you to break up very large" + + " result sets into multiple FlowFiles. If the value specified is zero, then all records are returned in a single FlowFile.") + .defaultValue("0") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder() + .name("web-client-service-provider") + .displayName("Web Client Service Provider") + .description("Web Client Service Provider to use for Airtable REST API requests") + .identifiesControllerService(WebClientServiceProvider.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful query.") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + API_URL, + API_KEY, + BASE_ID, + TABLE_ID, + FIELDS, + CUSTOM_FILTER, + PAGE_SIZE, + MAX_RECORDS_PER_FLOW_FILE, + WEB_CLIENT_SERVICE_PROVIDER + )); + + private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS); + + private static final String LAST_RECORD_FETCH_TIME = "last_record_fetch_time"; + private static final int QUERY_LAG_SECONDS = 1; + + private volatile AirtableRestService airtableRestService; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + final String apiUrl = context.getProperty(API_URL).evaluateAttributeExpressions().getValue(); + final String apiKey = context.getProperty(API_KEY).getValue(); + final String baseId = context.getProperty(BASE_ID).evaluateAttributeExpressions().getValue(); + final String tableId = context.getProperty(TABLE_ID).evaluateAttributeExpressions().getValue(); + final WebClientServiceProvider webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + airtableRestService = new AirtableRestService(webClientServiceProvider, apiUrl, apiKey, baseId, tableId); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final Integer maxRecordsPerFlowFile = context.getProperty(MAX_RECORDS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); + + final StateMap state; + try { + state = context.getStateManager().getState(Scope.CLUSTER); + } catch (IOException e) { + throw new ProcessException("Failed to get cluster state", e); + } + + final String lastRecordFetchDateTime = state.get(LAST_RECORD_FETCH_TIME); + final String currentRecordFetchDateTime = OffsetDateTime.now() + .minusSeconds(QUERY_LAG_SECONDS) + .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); + + final AirtableGetRecordsParameters getRecordsParameters = buildGetRecordsParameters(context, lastRecordFetchDateTime, currentRecordFetchDateTime); + final AirtableRetrieveTableResult retrieveTableResult; + try { + final AirtableTableRetriever tableRetriever = new AirtableTableRetriever(airtableRestService, getRecordsParameters, maxRecordsPerFlowFile); + retrieveTableResult = tableRetriever.retrieveAll(session); + } catch (IOException e) { + throw new ProcessException("Failed to read Airtable records", e); + } catch (RateLimitExceededException e) { + context.yield(); + throw new ProcessException("Airtable REST API rate limit exceeded while reading records", e); + } + + final List<FlowFile> flowFiles = retrieveTableResult.getFlowFiles(); + if (flowFiles.isEmpty()) { + return; + } + + if (maxRecordsPerFlowFile != null && maxRecordsPerFlowFile > 0) { + fragmentFlowFiles(session, flowFiles); + } + transferFlowFiles(session, flowFiles, retrieveTableResult.getTotalRecordCount()); + + final Map<String, String> newState = new HashMap<>(state.toMap()); + newState.put(LAST_RECORD_FETCH_TIME, currentRecordFetchDateTime); + try { + context.getStateManager().setState(newState, Scope.CLUSTER); + } catch (IOException e) { + throw new ProcessException("Failed to update cluster state", e); + } + } + + private AirtableGetRecordsParameters buildGetRecordsParameters(final ProcessContext context, + final String lastRecordFetchTime, + final String nowDateTimeString) { + final String fieldsProperty = context.getProperty(FIELDS).evaluateAttributeExpressions().getValue(); + final String customFilter = context.getProperty(CUSTOM_FILTER).evaluateAttributeExpressions().getValue(); + final Integer pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions().asInteger(); + + final AirtableGetRecordsParameters.Builder getRecordsParametersBuilder = new AirtableGetRecordsParameters.Builder(); + if (lastRecordFetchTime != null) { + getRecordsParametersBuilder + .modifiedAfter(lastRecordFetchTime) + .modifiedBefore(nowDateTimeString); + } + if (fieldsProperty != null) { + getRecordsParametersBuilder.fields(Arrays.stream(fieldsProperty.split(",")).map(String::trim).collect(Collectors.toList())); + } + getRecordsParametersBuilder.customFilter(customFilter); + if (pageSize != null && pageSize > 0) { + getRecordsParametersBuilder.pageSize(pageSize); + } + + return getRecordsParametersBuilder.build(); + } + + private void fragmentFlowFiles(final ProcessSession session, final List<FlowFile> flowFiles) { + final String fragmentIdentifier = UUID.randomUUID().toString(); + for (int i = 0; i < flowFiles.size(); i++) { + final Map<String, String> fragmentAttributes = new HashMap<>(); + fragmentAttributes.put(FRAGMENT_ID.key(), fragmentIdentifier); + fragmentAttributes.put(FRAGMENT_INDEX.key(), String.valueOf(i)); + fragmentAttributes.put(FRAGMENT_COUNT.key(), String.valueOf(flowFiles.size())); + + flowFiles.set(i, session.putAllAttributes(flowFiles.get(i), fragmentAttributes)); + } + } + + private void transferFlowFiles(final ProcessSession session, final List<FlowFile> flowFiles, final int totalRecordCount) { + session.transfer(flowFiles, REL_SUCCESS); Review Comment: The processor should generate a `RECEIVE` provenance event for each successful FlowFile. ########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable; + +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult; +import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever; +import org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters; +import org.apache.nifi.processors.airtable.service.AirtableRestService; +import org.apache.nifi.processors.airtable.service.RateLimitExceededException; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +@PrimaryNodeOnly +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@TriggerSerially +@TriggerWhenEmpty +@Tags({"airtable", "query", "database"}) +@CapabilityDescription("Query records from an Airtable table. Records are incrementally retrieved based on the last modified time of the records." + + " Records can also be further filtered by setting the 'Custom Filter' property which supports the formulas provided by the Airtable API." + + " Schema can be provided by setting up a JsonTreeReader controller service properly. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's time is stored in order to enable incremental loading." + + " The initial query returns all the records in the table and each subsequent query filters the records by their last modified time." + + " In other words, if a record is updated after the last successful query only the updated records will be returned in the next query." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile."), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Records Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Records Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), + @WritesAttribute(attribute = "fragment.index", description = "If 'Max Records Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced"), +}) +@DefaultSettings(yieldDuration = "35 sec") +public class QueryAirtableTable extends AbstractProcessor { + + static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder() + .name("api-url") + .displayName("API URL") + .description("The URL for the Airtable REST API including the domain and the path to the API (e.g. https://api.airtable.com/v0).") + .defaultValue(API_V0_BASE_URL) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder() + .name("api-key") + .displayName("API Key") + .description("The REST API key to use in queries. Should be generated on Airtable's account page.") + .required(true) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder() + .name("base-id") + .displayName("Base ID") + .description("The ID of the Airtable base to be queried.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder() + .name("table-id") + .displayName("Table ID") + .description("The name or the ID of the Airtable table to be queried.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder() + .name("fields") + .displayName("Fields") + .description("Comma-separated list of fields to query from the table. Both the field's name and ID can be used.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor CUSTOM_FILTER = new PropertyDescriptor.Builder() + .name("custom-filter") + .displayName("Custom Filter") + .description("Filter records by Airtable's formulas.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder() + .name("page-size") + .displayName("Page Size") + .description("Number of records to be fetched in a page. Should be between 0 and 100 inclusively.") + .defaultValue("0") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.createLongValidator(0, 100, true)) + .build(); + + static final PropertyDescriptor MAX_RECORDS_PER_FLOW_FILE = new PropertyDescriptor.Builder() + .name("max-records-per-flow-file") + .displayName("Max Records Per FlowFile") + .description("The maximum number of result records that will be included in a single FlowFile. This will allow you to break up very large" + + " result sets into multiple FlowFiles. If the value specified is zero, then all records are returned in a single FlowFile.") + .defaultValue("0") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder() + .name("web-client-service-provider") + .displayName("Web Client Service Provider") + .description("Web Client Service Provider to use for Airtable REST API requests") + .identifiesControllerService(WebClientServiceProvider.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful query.") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + API_URL, + API_KEY, + BASE_ID, + TABLE_ID, + FIELDS, + CUSTOM_FILTER, + PAGE_SIZE, + MAX_RECORDS_PER_FLOW_FILE, + WEB_CLIENT_SERVICE_PROVIDER + )); + + private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS); + + private static final String LAST_RECORD_FETCH_TIME = "last_record_fetch_time"; + private static final int QUERY_LAG_SECONDS = 1; + + private volatile AirtableRestService airtableRestService; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + final String apiUrl = context.getProperty(API_URL).evaluateAttributeExpressions().getValue(); + final String apiKey = context.getProperty(API_KEY).getValue(); + final String baseId = context.getProperty(BASE_ID).evaluateAttributeExpressions().getValue(); + final String tableId = context.getProperty(TABLE_ID).evaluateAttributeExpressions().getValue(); + final WebClientServiceProvider webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + airtableRestService = new AirtableRestService(webClientServiceProvider, apiUrl, apiKey, baseId, tableId); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final Integer maxRecordsPerFlowFile = context.getProperty(MAX_RECORDS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); + + final StateMap state; + try { + state = context.getStateManager().getState(Scope.CLUSTER); + } catch (IOException e) { + throw new ProcessException("Failed to get cluster state", e); + } + + final String lastRecordFetchDateTime = state.get(LAST_RECORD_FETCH_TIME); + final String currentRecordFetchDateTime = OffsetDateTime.now() + .minusSeconds(QUERY_LAG_SECONDS) + .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); + + final AirtableGetRecordsParameters getRecordsParameters = buildGetRecordsParameters(context, lastRecordFetchDateTime, currentRecordFetchDateTime); + final AirtableRetrieveTableResult retrieveTableResult; + try { + final AirtableTableRetriever tableRetriever = new AirtableTableRetriever(airtableRestService, getRecordsParameters, maxRecordsPerFlowFile); + retrieveTableResult = tableRetriever.retrieveAll(session); + } catch (IOException e) { + throw new ProcessException("Failed to read Airtable records", e); + } catch (RateLimitExceededException e) { + context.yield(); + throw new ProcessException("Airtable REST API rate limit exceeded while reading records", e); + } + + final List<FlowFile> flowFiles = retrieveTableResult.getFlowFiles(); + if (flowFiles.isEmpty()) { + return; Review Comment: The processor should yield when no new records available. Otherwise it will ping the Airtable service in a quick loop. ########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableTableRetriever.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable.parse; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator.Feature; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters; +import org.apache.nifi.processors.airtable.service.AirtableRestService; + +public class AirtableTableRetriever { + + static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory() + .configure(Feature.AUTO_CLOSE_JSON_CONTENT, false); + + final AirtableRestService airtableRestService; + final AirtableGetRecordsParameters getRecordsParameters; + final Integer maxRecordsPerFlowFile; + + public AirtableTableRetriever(final AirtableRestService airtableRestService, + final AirtableGetRecordsParameters getRecordsParameters, + final Integer maxRecordsPerFlowFile) { + this.airtableRestService = airtableRestService; + this.getRecordsParameters = getRecordsParameters; + this.maxRecordsPerFlowFile = maxRecordsPerFlowFile; + } + + public AirtableRetrieveTableResult retrieveAll(final ProcessSession session) throws IOException { + int totalRecordCount = 0; + final List<FlowFile> flowFiles = new ArrayList<>(); + AirtableRetrievePageResult retrievePageResult = null; + do { + retrievePageResult = retrieveNextPage(session, Optional.ofNullable(retrievePageResult)); + totalRecordCount += retrievePageResult.getParsedRecordCount(); + flowFiles.addAll(retrievePageResult.getFlowFiles()); + } while (retrievePageResult.getNextOffset().isPresent()); + + retrievePageResult.getOngoingRecordSetFlowFileWriter() + .map(writer -> { + try { + return writer.closeRecordSet(session); + } catch (IOException e) { + throw new ProcessException("Failed to close Airtable record writer", e); + } + }) + .ifPresent(flowFiles::add); + return new AirtableRetrieveTableResult(flowFiles, totalRecordCount); + } + + private AirtableRetrievePageResult retrieveNextPage(final ProcessSession session, final Optional<AirtableRetrievePageResult> previousPagePageResult) + throws IOException { + final AirtableGetRecordsParameters parameters = previousPagePageResult.flatMap(AirtableRetrievePageResult::getNextOffset) + .map(getRecordsParameters::withOffset) + .orElse(getRecordsParameters); + final InputStream inputStream = airtableRestService.getRecords(parameters); Review Comment: This InputStream should be closed. It comes from `HttpResponseEntity` which is `AutoClosable` and it would close the stream if it was used in a TWR block. I see it would be complicated to restructure the code to process `HttpResponseEntity` in a single block. In this case we must close the InputStream ourselves. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
