[ 
https://issues.apache.org/jira/browse/NIFI-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15381560#comment-15381560
 ] 

ASF GitHub Bot commented on NIFI-2156:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/642#discussion_r71091358
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
 ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.dbcp.DBCPService;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.io.IOException;
    +import java.sql.Connection;
    +import java.sql.DatabaseMetaData;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * A processor to retrieve a list of tables (and their metadata) from a 
database connection
    + */
    +@TriggerSerially
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"sql", "list", "jdbc", "table", "database"})
    +@CapabilityDescription("Generates a set of flow files, each containing 
attributes corresponding to metadata about a table from a database connection.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "db.table.name", description = 
"Contains the name of a database table from the connection"),
    +        @WritesAttribute(attribute = "db.table.catalog", description = 
"Contains the name of the catalog to which the table belongs (may be null)"),
    +        @WritesAttribute(attribute = "db.table.schema", description = 
"Contains the name of the schema to which the table belongs (may be null)"),
    +        @WritesAttribute(attribute = "db.table.fullname", description = 
"Contains the fully-qualifed table name (possibly including catalog, schema, 
etc.)"),
    +        @WritesAttribute(attribute = "db.table.type",
    +                description = "Contains the type of the database table 
from the connection. Typical types are \"TABLE\", \"VIEW\", \"SYSTEM TABLE\", "
    +                        + "\"GLOBAL TEMPORARY\", \"LOCAL TEMPORARY\", 
\"ALIAS\", \"SYNONYM\""),
    +        @WritesAttribute(attribute = "db.table.remarks", description = 
"Contains the name of a database table from the connection"),
    +        @WritesAttribute(attribute = "db.table.count", description = 
"Contains the number of rows in the table")
    +})
    +@Stateful(scopes = {Scope.LOCAL}, description = "After performing a 
listing of tables, the timestamp of the query is stored. "
    +        + "This allows the Processor to not re-list tables the next time 
that the Processor is run. Changing any of the processor properties will "
    +        + "indicate that the processor should reset state and thus re-list 
the tables using the new configuration. This processor is meant to be "
    +        + "run on the primary node only.")
    +public class ListDatabaseTables extends AbstractProcessor {
    +
    +    // Attribute names
    +    public static final String DB_TABLE_NAME = "db.table.name";
    +    public static final String DB_TABLE_CATALOG = "db.table.catalog";
    +    public static final String DB_TABLE_SCHEMA = "db.table.schema";
    +    public static final String DB_TABLE_FULLNAME = "db.table.fullname";
    +    public static final String DB_TABLE_TYPE = "db.table.type";
    +    public static final String DB_TABLE_REMARKS = "db.table.remarks";
    +    public static final String DB_TABLE_COUNT = "db.table.count";
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("All FlowFiles that are received are routed to 
success")
    +            .build();
    +
    +    // Property descriptors
    +    public static final PropertyDescriptor DBCP_SERVICE = new 
PropertyDescriptor.Builder()
    +            .name("list-db-tables-db-connection")
    +            .displayName("Database Connection Pooling Service")
    +            .description("The Controller Service that is used to obtain 
connection to database")
    +            .required(true)
    +            .identifiesControllerService(DBCPService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CATALOG = new 
PropertyDescriptor.Builder()
    +            .name("list-db-tables-catalog")
    +            .displayName("Catalog")
    +            .description("The name of a catalog from which to list 
database tables. The name must match the catalog name as it is stored in the 
database. "
    +                    + "If the property is not set, the catalog name will 
not be used to narrow the search for tables. If the property is set to an empty 
string, "
    +                    + "tables without a catalog will be listed.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor SCHEMA_PATTERN = new 
PropertyDescriptor.Builder()
    +            .name("list-db-tables-schema-pattern")
    +            .displayName("Schema Pattern")
    +            .description("A pattern for matching schemas in the database. 
Within a pattern, \"%\" means match any substring of 0 or more characters, "
    +                    + "and \"_\" means match any one character. The 
pattern must match the schema name as it is stored in the database. "
    +                    + "If the property is not set, the schema name will 
not be used to narrow the search for tables. If the property is set to an empty 
string, "
    +                    + "tables without a schema will be listed.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME_PATTERN = new 
PropertyDescriptor.Builder()
    +            .name("list-db-tables-name-pattern")
    +            .displayName("Table Name Pattern")
    +            .description("A pattern for matching tables in the database. 
Within a pattern, \"%\" means match any substring of 0 or more characters, "
    +                    + "and \"_\" means match any one character. The 
pattern must match the table name as it is stored in the database. "
    +                    + "If the property is not set, all tables will be 
retrieved.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_TYPES = new 
PropertyDescriptor.Builder()
    +            .name("list-db-tables-types")
    +            .displayName("Table Types")
    +            .description("A comma-separated list of table types to 
include. For example, some databases support TABLE and VIEW types. If the 
property is not set, "
    +                    + "tables of all types will be returned.")
    +            .required(false)
    +            .defaultValue("TABLE")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor INCLUDE_COUNT = new 
PropertyDescriptor.Builder()
    +            .name("list-db-include-count")
    +            .displayName("Include Count")
    +            .description("Whether to include the table's row count as a 
flow file attribute. This affects performance as a database query will be 
generated "
    +                    + "for each table in the retrieved list.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    private static final Set<Relationship> relationships;
    +
    +    private boolean resetState = false;
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only 
once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(DBCP_SERVICE);
    +        _propertyDescriptors.add(CATALOG);
    +        _propertyDescriptors.add(SCHEMA_PATTERN);
    +        _propertyDescriptors.add(TABLE_NAME_PATTERN);
    +        _propertyDescriptors.add(TABLE_TYPES);
    +        _propertyDescriptors.add(INCLUDE_COUNT);
    +        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        try {
    +            if (resetState) {
    +                context.getStateManager().clear(Scope.LOCAL);
    +                resetState = false;
    +            }
    +        } catch (IOException ioe) {
    +            throw new ProcessException(ioe);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        final ComponentLog logger = getLogger();
    +        final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
    +        final String catalog = context.getProperty(CATALOG).getValue();
    +        final String schemaPattern = 
context.getProperty(SCHEMA_PATTERN).getValue();
    +        final String tableNamePattern = 
context.getProperty(TABLE_NAME_PATTERN).getValue();
    +        final String[] tableTypes = 
context.getProperty(TABLE_TYPES).isSet()
    +                ? 
context.getProperty(TABLE_TYPES).getValue().split("\\s*,\\s*")
    +                : null;
    +        final boolean includeCount = 
context.getProperty(INCLUDE_COUNT).asBoolean();
    +
    +        final StateManager stateManager = context.getStateManager();
    +        final StateMap stateMap;
    +        final Map<String, String> stateMapProperties;
    +        try {
    +            stateMap = stateManager.getState(Scope.LOCAL);
    +            stateMapProperties = new HashMap<>(stateMap.toMap());
    +        } catch (IOException ioe) {
    +            throw new ProcessException(ioe);
    +        }
    +
    +        try (final Connection con = dbcpService.getConnection()) {
    +
    +            DatabaseMetaData dbMetaData = con.getMetaData();
    +            ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, 
tableNamePattern, tableTypes);
    +            while (rs.next()) {
    +                final String tableCatalog = rs.getString(1);
    +                final String tableSchema = rs.getString(2);
    +                final String tableName = rs.getString(3);
    +                final String tableType = rs.getString(4);
    +                final String tableRemarks = rs.getString(5);
    +
    +                // Build fully-qualified name
    +                String fqn = Stream.of(tableCatalog, tableSchema, 
tableName)
    +                        .filter(segment -> !StringUtils.isEmpty(segment))
    +                        .collect(Collectors.joining("."));
    +
    +                String fqTableName = stateMap.get(fqn);
    +                if (fqTableName == null) {
    +                    FlowFile flowFile = session.create();
    +                    logger.info("Found {}: {}", new Object[]{tableType, 
fqn});
    +                    if (includeCount) {
    +                        try (Statement st = con.createStatement()) {
    +                            final String countQuery = "SELECT COUNT(1) 
FROM " + fqn;
    +
    +                            logger.debug("Executing query: {}", new 
Object[]{countQuery});
    +                            ResultSet countResult = 
st.executeQuery(countQuery);
    +                            if (countResult.next()) {
    +                                flowFile = session.putAttribute(flowFile, 
DB_TABLE_COUNT, Long.toString(countResult.getLong(1)));
    +                            }
    +                        } catch (SQLException se) {
    +                            logger.error("Couldn't get row count for {}", 
new Object[]{fqn});
    +                            session.remove(flowFile);
    --- End diff --
    
    Wouldn't removing a flowfile essentially skip what ever information 
would've been in that row, leading to data loss?


> Add ListDatabaseTables processor
> --------------------------------
>
>                 Key: NIFI-2156
>                 URL: https://issues.apache.org/jira/browse/NIFI-2156
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>             Fix For: 1.0.0
>
>
> This processor would use a DatabaseConnectionPool controller service, call 
> getTables(), and if the (optional, defaulting-to-false) property "Include Row 
> Count" is set, then a "SELECT COUNT(1) from table" would be issued to the 
> database. The table catalog, schema, name, type, remarks (and its count if 
> specified) will be included as attributes in a zero-content flow file.
> It will also use State Management to only list tables once. If new tables are 
> added (and the processor is running), then the new tables' flow files will be 
> generated. Changing any property that could affect the list of returned 
> tables (such as the DB Connection, catalog, schema pattern, table name 
> pattern, or table types) will reset the state and all tables will be fetched 
> using the new criteria. The state can also be manually cleared using the 
> standard Clear State link on the View State dialog (available on the 
> processor's context menu)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to