exceptionfactory commented on code in PR #6350: URL: https://github.com/apache/nifi/pull/6350#discussion_r960609452
########## nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java: ########## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable; + +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.airtable.record.AirtableRecordSet; +import org.apache.nifi.processors.airtable.record.AirtableJsonTreeRowRecordReaderFactory; +import org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters; +import org.apache.nifi.processors.airtable.service.AirtableRestService; +import org.apache.nifi.processors.airtable.service.AirtableRestService.RateLimitExceededException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.JsonRecordReaderFactory; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +@PrimaryNodeOnly +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@TriggerSerially +@TriggerWhenEmpty +@Tags({"airtable", "query", "database"}) +@CapabilityDescription("Query records from an Airtable table. Records are incrementally retrieved based on the last modified time of the records." + + " Records can also be further filtered by setting the 'Custom Filter' property which supports the formulas provided by the Airtable API." + + " Schema can be provided by setting up a JsonTreeReader controller service properly. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's time is stored in order to enable incremental loading." + + " The initial query returns all the records in the table and each subsequent query filters the records by their last modified time." + + " In other words, if a record is updated after the last successful query only the updated records will be returned in the next query." + + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected," + + " the new node can pick up where the previous node left off, without duplicating the data.") +@WritesAttributes({ + @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."), + @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile.") +}) +@DefaultSettings(yieldDuration = "35 sec") +public class QueryAirtableTable extends AbstractProcessor { + + static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder() + .name("api-url") + .displayName("API URL") + .description("The URL for the Airtable REST API including the domain and the path to the API (e.g. https://api.airtable.com/v0).") + .defaultValue(API_V0_BASE_URL) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder() + .name("api-key") + .displayName("API Key") + .description("The REST API key to use in queries. Should be generated on Airtable's account page.") + .required(true) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder() + .name("base-id") + .displayName("Base ID") + .description("The ID of the Airtable base to be queried.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder() + .name("table-id") + .displayName("Table Name or ID") + .description("The name or the ID of the Airtable table to be queried.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder() + .name("fields") + .displayName("Fields") + .description("Comma-separated list of fields to query from the table. Both the field's name and ID can be used.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor CUSTOM_FILTER = new PropertyDescriptor.Builder() + .name("custom-filter") + .displayName("Custom Filter") + .description("Filter records by Airtable's formulas.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder() + .name("page-size") + .displayName("Page Size") + .description("Number of records to be fetched in a page. Should be between 0 and 100 inclusively.") Review Comment: Is 100 the maximum number supported by the Airtable service? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
