[
https://issues.apache.org/jira/browse/NIFI-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15381560#comment-15381560
]
ASF GitHub Bot commented on NIFI-2156:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/642#discussion_r71091358
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A processor to retrieve a list of tables (and their metadata) from a
database connection
+ */
+@TriggerSerially
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"sql", "list", "jdbc", "table", "database"})
+@CapabilityDescription("Generates a set of flow files, each containing
attributes corresponding to metadata about a table from a database connection.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "db.table.name", description =
"Contains the name of a database table from the connection"),
+ @WritesAttribute(attribute = "db.table.catalog", description =
"Contains the name of the catalog to which the table belongs (may be null)"),
+ @WritesAttribute(attribute = "db.table.schema", description =
"Contains the name of the schema to which the table belongs (may be null)"),
+ @WritesAttribute(attribute = "db.table.fullname", description =
"Contains the fully-qualifed table name (possibly including catalog, schema,
etc.)"),
+ @WritesAttribute(attribute = "db.table.type",
+ description = "Contains the type of the database table
from the connection. Typical types are \"TABLE\", \"VIEW\", \"SYSTEM TABLE\", "
+ + "\"GLOBAL TEMPORARY\", \"LOCAL TEMPORARY\",
\"ALIAS\", \"SYNONYM\""),
+ @WritesAttribute(attribute = "db.table.remarks", description =
"Contains the name of a database table from the connection"),
+ @WritesAttribute(attribute = "db.table.count", description =
"Contains the number of rows in the table")
+})
+@Stateful(scopes = {Scope.LOCAL}, description = "After performing a
listing of tables, the timestamp of the query is stored. "
+ + "This allows the Processor to not re-list tables the next time
that the Processor is run. Changing any of the processor properties will "
+ + "indicate that the processor should reset state and thus re-list
the tables using the new configuration. This processor is meant to be "
+ + "run on the primary node only.")
+public class ListDatabaseTables extends AbstractProcessor {
+
+ // Attribute names
+ public static final String DB_TABLE_NAME = "db.table.name";
+ public static final String DB_TABLE_CATALOG = "db.table.catalog";
+ public static final String DB_TABLE_SCHEMA = "db.table.schema";
+ public static final String DB_TABLE_FULLNAME = "db.table.fullname";
+ public static final String DB_TABLE_TYPE = "db.table.type";
+ public static final String DB_TABLE_REMARKS = "db.table.remarks";
+ public static final String DB_TABLE_COUNT = "db.table.count";
+
+ // Relationships
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles that are received are routed to
success")
+ .build();
+
+ // Property descriptors
+ public static final PropertyDescriptor DBCP_SERVICE = new
PropertyDescriptor.Builder()
+ .name("list-db-tables-db-connection")
+ .displayName("Database Connection Pooling Service")
+ .description("The Controller Service that is used to obtain
connection to database")
+ .required(true)
+ .identifiesControllerService(DBCPService.class)
+ .build();
+
+ public static final PropertyDescriptor CATALOG = new
PropertyDescriptor.Builder()
+ .name("list-db-tables-catalog")
+ .displayName("Catalog")
+ .description("The name of a catalog from which to list
database tables. The name must match the catalog name as it is stored in the
database. "
+ + "If the property is not set, the catalog name will
not be used to narrow the search for tables. If the property is set to an empty
string, "
+ + "tables without a catalog will be listed.")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor SCHEMA_PATTERN = new
PropertyDescriptor.Builder()
+ .name("list-db-tables-schema-pattern")
+ .displayName("Schema Pattern")
+ .description("A pattern for matching schemas in the database.
Within a pattern, \"%\" means match any substring of 0 or more characters, "
+ + "and \"_\" means match any one character. The
pattern must match the schema name as it is stored in the database. "
+ + "If the property is not set, the schema name will
not be used to narrow the search for tables. If the property is set to an empty
string, "
+ + "tables without a schema will be listed.")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor TABLE_NAME_PATTERN = new
PropertyDescriptor.Builder()
+ .name("list-db-tables-name-pattern")
+ .displayName("Table Name Pattern")
+ .description("A pattern for matching tables in the database.
Within a pattern, \"%\" means match any substring of 0 or more characters, "
+ + "and \"_\" means match any one character. The
pattern must match the table name as it is stored in the database. "
+ + "If the property is not set, all tables will be
retrieved.")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor TABLE_TYPES = new
PropertyDescriptor.Builder()
+ .name("list-db-tables-types")
+ .displayName("Table Types")
+ .description("A comma-separated list of table types to
include. For example, some databases support TABLE and VIEW types. If the
property is not set, "
+ + "tables of all types will be returned.")
+ .required(false)
+ .defaultValue("TABLE")
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor INCLUDE_COUNT = new
PropertyDescriptor.Builder()
+ .name("list-db-include-count")
+ .displayName("Include Count")
+ .description("Whether to include the table's row count as a
flow file attribute. This affects performance as a database query will be
generated "
+ + "for each table in the retrieved list.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ private static final List<PropertyDescriptor> propertyDescriptors;
+ private static final Set<Relationship> relationships;
+
+ private boolean resetState = false;
+
+ /*
+ * Will ensure that the list of property descriptors is build only
once.
+ * Will also create a Set of relationships
+ */
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.add(DBCP_SERVICE);
+ _propertyDescriptors.add(CATALOG);
+ _propertyDescriptors.add(SCHEMA_PATTERN);
+ _propertyDescriptors.add(TABLE_NAME_PATTERN);
+ _propertyDescriptors.add(TABLE_TYPES);
+ _propertyDescriptors.add(INCLUDE_COUNT);
+ propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
+
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @OnScheduled
+ public void setup(ProcessContext context) {
+ try {
+ if (resetState) {
+ context.getStateManager().clear(Scope.LOCAL);
+ resetState = false;
+ }
+ } catch (IOException ioe) {
+ throw new ProcessException(ioe);
+ }
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ final ComponentLog logger = getLogger();
+ final DBCPService dbcpService =
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+ final String catalog = context.getProperty(CATALOG).getValue();
+ final String schemaPattern =
context.getProperty(SCHEMA_PATTERN).getValue();
+ final String tableNamePattern =
context.getProperty(TABLE_NAME_PATTERN).getValue();
+ final String[] tableTypes =
context.getProperty(TABLE_TYPES).isSet()
+ ?
context.getProperty(TABLE_TYPES).getValue().split("\\s*,\\s*")
+ : null;
+ final boolean includeCount =
context.getProperty(INCLUDE_COUNT).asBoolean();
+
+ final StateManager stateManager = context.getStateManager();
+ final StateMap stateMap;
+ final Map<String, String> stateMapProperties;
+ try {
+ stateMap = stateManager.getState(Scope.LOCAL);
+ stateMapProperties = new HashMap<>(stateMap.toMap());
+ } catch (IOException ioe) {
+ throw new ProcessException(ioe);
+ }
+
+ try (final Connection con = dbcpService.getConnection()) {
+
+ DatabaseMetaData dbMetaData = con.getMetaData();
+ ResultSet rs = dbMetaData.getTables(catalog, schemaPattern,
tableNamePattern, tableTypes);
+ while (rs.next()) {
+ final String tableCatalog = rs.getString(1);
+ final String tableSchema = rs.getString(2);
+ final String tableName = rs.getString(3);
+ final String tableType = rs.getString(4);
+ final String tableRemarks = rs.getString(5);
+
+ // Build fully-qualified name
+ String fqn = Stream.of(tableCatalog, tableSchema,
tableName)
+ .filter(segment -> !StringUtils.isEmpty(segment))
+ .collect(Collectors.joining("."));
+
+ String fqTableName = stateMap.get(fqn);
+ if (fqTableName == null) {
+ FlowFile flowFile = session.create();
+ logger.info("Found {}: {}", new Object[]{tableType,
fqn});
+ if (includeCount) {
+ try (Statement st = con.createStatement()) {
+ final String countQuery = "SELECT COUNT(1)
FROM " + fqn;
+
+ logger.debug("Executing query: {}", new
Object[]{countQuery});
+ ResultSet countResult =
st.executeQuery(countQuery);
+ if (countResult.next()) {
+ flowFile = session.putAttribute(flowFile,
DB_TABLE_COUNT, Long.toString(countResult.getLong(1)));
+ }
+ } catch (SQLException se) {
+ logger.error("Couldn't get row count for {}",
new Object[]{fqn});
+ session.remove(flowFile);
--- End diff --
Wouldn't removing a flowfile essentially skip what ever information
would've been in that row, leading to data loss?
> Add ListDatabaseTables processor
> --------------------------------
>
> Key: NIFI-2156
> URL: https://issues.apache.org/jira/browse/NIFI-2156
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
> Fix For: 1.0.0
>
>
> This processor would use a DatabaseConnectionPool controller service, call
> getTables(), and if the (optional, defaulting-to-false) property "Include Row
> Count" is set, then a "SELECT COUNT(1) from table" would be issued to the
> database. The table catalog, schema, name, type, remarks (and its count if
> specified) will be included as attributes in a zero-content flow file.
> It will also use State Management to only list tables once. If new tables are
> added (and the processor is running), then the new tables' flow files will be
> generated. Changing any property that could affect the list of returned
> tables (such as the DB Connection, catalog, schema pattern, table name
> pattern, or table types) will reset the state and all tables will be fetched
> using the new criteria. The state can also be manually cleared using the
> standard Clear State link on the View State dialog (available on the
> processor's context menu)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)