exceptionfactory commented on code in PR #8042:
URL: https://github.com/apache/nifi/pull/8042#discussion_r1415801708


##########
nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-db-schema-registry-service/src/main/java/org/apache/nifi/db/schemaregistry/DatabaseTableSchemaRegistry.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.db.schemaregistry;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+@Tags({"schema", "registry", "database", "table"})
+@CapabilityDescription("Provides a service for generating a record schema from 
a database table definition. The service is configured "
+        + "to use a table name and a database connection fetches the table 
metadata (i.e. table definition) such as column names, data types, "
+        + "nullability, etc.")
+public class DatabaseTableSchemaRegistry extends AbstractControllerService 
implements SchemaRegistry {
+
+    private static final Set<SchemaField> schemaFields = 
EnumSet.of(SchemaField.SCHEMA_NAME);
+
+    static final PropertyDescriptor DBCP_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Database Connection Pooling Service")
+            .displayName("Database Connection Pooling Service")
+            .description("The Controller Service that is used to obtain a 
connection to the database for retrieving table information.")
+            .required(true)
+            .identifiesControllerService(DBCPService.class)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAME = new 
PropertyDescriptor.Builder()
+            .name("Catalog Name")
+            .displayName("Catalog Name")
+            .description("The name of the catalog used to locate the desired 
table. This may not apply for the database that you are querying. In this case, 
leave the field empty. Note that if the "
+                    + "property is set and the database is case-sensitive, the 
catalog name must match the database's catalog name exactly.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor SCHEMA_NAME = new 
PropertyDescriptor.Builder()
+            .name("Schema Name")
+            .displayName("Schema Name")
+            .description("The name of the schema that the table belongs to. 
This may not apply for the database that you are updating. In this case, leave 
the field empty. Note that if the "
+                    + "property is set and the database is case-sensitive, the 
schema name must match the database's schema name exactly. Also notice that if 
the same table name exists in multiple "
+                    + "schemas and Schema Name is not specified, the service 
will find those tables and give an error if the different tables have the same 
column name(s).")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected List<PropertyDescriptor> propDescriptors = 
Collections.unmodifiableList(Arrays.asList(
+            DBCP_SERVICE,
+            CATALOG_NAME,
+            SCHEMA_NAME
+    ));
+
+    private volatile DBCPService dbcpService;
+    private volatile String dbCatalogName;
+    private volatile String dbSchemaName;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propDescriptors;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        dbCatalogName = 
context.getProperty(CATALOG_NAME).evaluateAttributeExpressions().getValue();
+        dbSchemaName = 
context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions().getValue();
+    }
+
+    @Override
+    public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) 
throws IOException, SchemaNotFoundException {
+        if (schemaIdentifier.getName().isPresent()) {
+            return retrieveSchemaByName(schemaIdentifier);
+        } else {
+            throw new SchemaNotFoundException("This Schema Registry only 
supports retrieving a schema by name.");
+        }
+    }
+
+    @Override
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return schemaFields;
+    }
+
+    RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) 
throws IOException, SchemaNotFoundException {
+        final Optional<String> schemaName = schemaIdentifier.getName();
+        if (schemaName.isEmpty()) {
+            throw new SchemaNotFoundException("Cannot retrieve schema because 
Schema Name is not present");
+        }
+
+        final String tableName = schemaName.get();
+        try {
+            try (final Connection conn = dbcpService.getConnection()) {
+                final DatabaseMetaData databaseMetaData = conn.getMetaData();
+                    return getRecordSchemaFromMetadata(databaseMetaData, 
tableName);
+                }
+        } catch (SQLException sqle) {
+            throw new IOException("Error retrieving schema for table " + 
schemaName.get(), sqle);
+        }
+    }
+
+    private RecordSchema getRecordSchemaFromMetadata(final DatabaseMetaData 
databaseMetaData, final String tableName) throws SQLException, 
SchemaNotFoundException {
+        try (final ResultSet columnResultSet = 
databaseMetaData.getColumns(dbCatalogName, dbSchemaName, tableName, "%")) {
+
+            final List<RecordField> recordFields = new ArrayList<>();
+            while (columnResultSet.next()) {
+                recordFields.add(createRecordFieldFromColumn(columnResultSet));
+            }
+
+            // If no columns are found, check that the table exists
+            if (recordFields.isEmpty()) {
+                checkTableExists(databaseMetaData, tableName);
+            }
+            return new SimpleRecordSchema(recordFields);
+        }
+    }
+
+    private RecordField createRecordFieldFromColumn(final ResultSet 
columnResultSet) throws SQLException {
+        // COLUMN_DEF must be read first to work around Oracle bug, see 
NIFI-4279 for details
+        final String defaultValue = columnResultSet.getString("COLUMN_DEF");
+        final String columnName = columnResultSet.getString("COLUMN_NAME");
+        final int dataType = columnResultSet.getInt("DATA_TYPE");
+        final String nullableValue = columnResultSet.getString("IS_NULLABLE");
+        final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || 
nullableValue.isEmpty();
+        return new RecordField(
+                columnName,
+                DataTypeUtils.getDataTypeFromSQLTypeValue(dataType),
+                defaultValue,
+                isNullable);
+    }
+
+    private void checkTableExists(final DatabaseMetaData databaseMetaData, 
final String tableName) throws SchemaNotFoundException, SQLException {
+        try (final ResultSet tblrs = databaseMetaData.getTables(dbCatalogName, 
dbSchemaName, tableName, null)) {

Review Comment:
   Recommend renaming this variable for readability.
   ```suggestion
           try (final ResultSet tables = 
databaseMetaData.getTables(dbCatalogName, dbSchemaName, tableName, null)) {
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-db-schema-registry-service/src/test/java/org/apache/nifi/db/schemaregistry/DatabaseTableSchemaRegistryTest.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.db.schemaregistry;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.ConnectionUrlValidator;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.dbcp.DriverClassValidator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.serialization.record.StandardSchemaIdentifier;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.spy;
+
+public class DatabaseTableSchemaRegistryTest {
+
+    private static final List<String> CREATE_TABLE_STATEMENTS = Arrays.asList(
+            "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," 
+
+                    " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND 
code < 1000), dt date)",
+            "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name 
varchar(100)," +
+                    " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND 
code < 1000), dt date)",
+            "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name 
varchar(100)," +
+                    " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND 
code < 1000), dt date)",
+            "CREATE TABLE UUID_TEST (id integer primary key, name 
VARCHAR(100))",
+            "CREATE TABLE LONGVARBINARY_TEST (id integer primary key, name 
LONG VARCHAR FOR BIT DATA)"
+    );
+
+    private static final String SERVICE_ID = 
DBCPServiceSimpleImpl.class.getName();
+
+    private final static String DB_LOCATION = "target/db_schema_reg";
+
+    // This is to mimic those in DBCPProperties to avoid adding the dependency 
to nifi-dbcp-base
+    private static final PropertyDescriptor DATABASE_URL = new 
PropertyDescriptor.Builder()
+            .name("Database Connection URL")
+            .addValidator(new ConnectionUrlValidator())
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    private static final PropertyDescriptor DB_USER = new 
PropertyDescriptor.Builder()
+            .name("Database User")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    private static final PropertyDescriptor DB_PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("Password")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+    private static final PropertyDescriptor DB_DRIVERNAME = new 
PropertyDescriptor.Builder()
+            .name("Database Driver Class Name")
+            .addValidator(new DriverClassValidator())
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    private TestRunner runner;
+
+    @BeforeAll
+    public static void setupDatabase() throws Exception {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        try {
+            FileUtils.deleteFile(dbLocation, true);
+        } catch (IOException ignore) {
+            // Do nothing, may not have existed
+        }
+
+        // Mock the DBCP Controller Service so we can control the Results
+        DBCPService setupDbService = spy(new 
DBCPServiceSimpleImpl(DB_LOCATION));
+
+        final Connection conn = setupDbService.getConnection();
+        final Statement stmt = conn.createStatement();
+        for (String createTableStatement : CREATE_TABLE_STATEMENTS) {
+            stmt.execute(createTableStatement);
+        }
+
+        stmt.close();
+        conn.close();
+    }
+
+    @AfterAll
+    public static void shutdownDatabase() throws Exception {

Review Comment:
   The `Exception` is not thrown and can be removed:
   ```suggestion
       public static void shutdownDatabase() {
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-db-schema-registry-service/src/test/java/org/apache/nifi/db/schemaregistry/DatabaseTableSchemaRegistryTest.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.db.schemaregistry;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.ConnectionUrlValidator;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.dbcp.DriverClassValidator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.serialization.record.StandardSchemaIdentifier;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.spy;
+
+public class DatabaseTableSchemaRegistryTest {
+
+    private static final List<String> CREATE_TABLE_STATEMENTS = Arrays.asList(
+            "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," 
+
+                    " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND 
code < 1000), dt date)",
+            "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name 
varchar(100)," +
+                    " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND 
code < 1000), dt date)",
+            "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name 
varchar(100)," +
+                    " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND 
code < 1000), dt date)",
+            "CREATE TABLE UUID_TEST (id integer primary key, name 
VARCHAR(100))",
+            "CREATE TABLE LONGVARBINARY_TEST (id integer primary key, name 
LONG VARCHAR FOR BIT DATA)"
+    );
+
+    private static final String SERVICE_ID = 
DBCPServiceSimpleImpl.class.getName();
+
+    private final static String DB_LOCATION = "target/db_schema_reg";
+
+    // This is to mimic those in DBCPProperties to avoid adding the dependency 
to nifi-dbcp-base
+    private static final PropertyDescriptor DATABASE_URL = new 
PropertyDescriptor.Builder()
+            .name("Database Connection URL")
+            .addValidator(new ConnectionUrlValidator())
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    private static final PropertyDescriptor DB_USER = new 
PropertyDescriptor.Builder()
+            .name("Database User")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    private static final PropertyDescriptor DB_PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("Password")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+    private static final PropertyDescriptor DB_DRIVERNAME = new 
PropertyDescriptor.Builder()
+            .name("Database Driver Class Name")
+            .addValidator(new DriverClassValidator())
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    private TestRunner runner;
+
+    @BeforeAll
+    public static void setupDatabase() throws Exception {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        try {
+            FileUtils.deleteFile(dbLocation, true);
+        } catch (IOException ignore) {
+            // Do nothing, may not have existed
+        }
+
+        // Mock the DBCP Controller Service so we can control the Results
+        DBCPService setupDbService = spy(new 
DBCPServiceSimpleImpl(DB_LOCATION));
+
+        final Connection conn = setupDbService.getConnection();
+        final Statement stmt = conn.createStatement();
+        for (String createTableStatement : CREATE_TABLE_STATEMENTS) {
+            stmt.execute(createTableStatement);
+        }
+
+        stmt.close();
+        conn.close();
+    }
+
+    @AfterAll
+    public static void shutdownDatabase() throws Exception {
+        try {
+            DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + 
";shutdown=true");
+        } catch (Exception ignore) {
+            // Do nothing, this is what happens at Derby shutdown
+        }
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        try {
+            FileUtils.deleteFile(dbLocation, true);
+        } catch (IOException ignore) {
+            // Do nothing, may not have existed
+        }
+        System.clearProperty("derby.stream.error.file");
+    }
+
+    @BeforeEach
+    public void setService() throws InitializationException {
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        DBCPService dbcp = new DBCPServiceSimpleImpl(DB_LOCATION);
+        runner.addControllerService(SERVICE_ID, dbcp);
+
+        final String url = String.format("jdbc:derby:%s;create=false", 
DB_LOCATION);
+        runner.setProperty(dbcp, DATABASE_URL, url);
+        runner.setProperty(dbcp, DB_USER, String.class.getSimpleName());
+        runner.setProperty(dbcp, DB_PASSWORD, String.class.getName());
+        runner.setProperty(dbcp, DB_DRIVERNAME, 
"org.apache.derby.jdbc.EmbeddedDriver");
+        runner.enableControllerService(dbcp);
+    }
+
+    @Test
+    public void testGetSchemaExists() throws Exception {
+        DatabaseTableSchemaRegistry dbSchemaRegistry = new 
DatabaseTableSchemaRegistry();
+        runner.addControllerService("schemaRegistry", dbSchemaRegistry);
+        runner.setProperty(dbSchemaRegistry, 
DatabaseTableSchemaRegistry.DBCP_SERVICE, SERVICE_ID);
+        runner.setProperty(dbSchemaRegistry, 
DatabaseTableSchemaRegistry.SCHEMA_NAME, "SCHEMA1");
+        runner.enableControllerService(dbSchemaRegistry);
+        SchemaIdentifier schemaIdentifier = new 
StandardSchemaIdentifier.Builder()
+                .name("PERSONS")
+                .build();
+        RecordSchema recordSchema = 
dbSchemaRegistry.retrieveSchema(schemaIdentifier);
+        assertNotNull(recordSchema);
+        Optional<RecordField> recordField = recordSchema.getField("ID");
+        assertTrue(recordField.isPresent());
+        assertEquals(RecordFieldType.INT.getDataType(), 
recordField.get().getDataType());
+        recordField = recordSchema.getField("NAME");
+        assertTrue(recordField.isPresent());
+        assertEquals(RecordFieldType.STRING.getDataType(), 
recordField.get().getDataType());
+        recordField = recordSchema.getField("CODE");
+        assertTrue(recordField.isPresent());
+        assertEquals(RecordFieldType.INT.getDataType(), 
recordField.get().getDataType());
+        recordField = recordSchema.getField("DT");
+        assertTrue(recordField.isPresent());
+        assertEquals(RecordFieldType.DATE.getDataType(), 
recordField.get().getDataType());
+        // Get nonexistent field
+        recordField = recordSchema.getField("NOT_A_FIELD");
+        assertFalse(recordField.isPresent());
+    }
+
+    @Test
+    public void testGetSchemaNotExists() throws Exception {
+        DatabaseTableSchemaRegistry dbSchemaRegistry = new 
DatabaseTableSchemaRegistry();
+        runner.addControllerService("schemaRegistry", dbSchemaRegistry);
+        runner.setProperty(dbSchemaRegistry, 
DatabaseTableSchemaRegistry.DBCP_SERVICE, SERVICE_ID);
+        runner.setProperty(dbSchemaRegistry, 
DatabaseTableSchemaRegistry.SCHEMA_NAME, "SCHEMA1");
+        runner.enableControllerService(dbSchemaRegistry);
+        SchemaIdentifier schemaIdentifier = new 
StandardSchemaIdentifier.Builder()
+                .name("NOT_A_TABLE")
+                .build();
+        assertThrows(SchemaNotFoundException.class, () -> 
dbSchemaRegistry.retrieveSchema(schemaIdentifier));
+    }
+
+    private static class DBCPServiceSimpleImpl extends 
AbstractControllerService implements DBCPService {
+
+        private final String databaseLocation;
+
+        public DBCPServiceSimpleImpl(final String databaseLocation) {
+            this.databaseLocation = databaseLocation;
+        }
+
+        @Override
+        public String getIdentifier() {
+            return this.getClass().getName();
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");

Review Comment:
   Recommend declaring this class name once and reusing throughout the test 
class.



##########
nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-db-schema-registry-service/src/test/java/org/apache/nifi/db/schemaregistry/DatabaseTableSchemaRegistryTest.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.db.schemaregistry;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.ConnectionUrlValidator;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.dbcp.DriverClassValidator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.serialization.record.StandardSchemaIdentifier;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.spy;
+
+public class DatabaseTableSchemaRegistryTest {
+
+    private static final List<String> CREATE_TABLE_STATEMENTS = Arrays.asList(
+            "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," 
+
+                    " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND 
code < 1000), dt date)",
+            "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name 
varchar(100)," +
+                    " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND 
code < 1000), dt date)",
+            "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name 
varchar(100)," +
+                    " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND 
code < 1000), dt date)",
+            "CREATE TABLE UUID_TEST (id integer primary key, name 
VARCHAR(100))",
+            "CREATE TABLE LONGVARBINARY_TEST (id integer primary key, name 
LONG VARCHAR FOR BIT DATA)"
+    );
+
+    private static final String SERVICE_ID = 
DBCPServiceSimpleImpl.class.getName();
+
+    private final static String DB_LOCATION = "target/db_schema_reg";
+
+    // This is to mimic those in DBCPProperties to avoid adding the dependency 
to nifi-dbcp-base
+    private static final PropertyDescriptor DATABASE_URL = new 
PropertyDescriptor.Builder()
+            .name("Database Connection URL")
+            .addValidator(new ConnectionUrlValidator())
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    private static final PropertyDescriptor DB_USER = new 
PropertyDescriptor.Builder()
+            .name("Database User")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    private static final PropertyDescriptor DB_PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("Password")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+    private static final PropertyDescriptor DB_DRIVERNAME = new 
PropertyDescriptor.Builder()
+            .name("Database Driver Class Name")
+            .addValidator(new DriverClassValidator())
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    private TestRunner runner;
+
+    @BeforeAll
+    public static void setupDatabase() throws Exception {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        try {
+            FileUtils.deleteFile(dbLocation, true);
+        } catch (IOException ignore) {
+            // Do nothing, may not have existed
+        }
+
+        // Mock the DBCP Controller Service so we can control the Results
+        DBCPService setupDbService = spy(new 
DBCPServiceSimpleImpl(DB_LOCATION));
+
+        final Connection conn = setupDbService.getConnection();
+        final Statement stmt = conn.createStatement();
+        for (String createTableStatement : CREATE_TABLE_STATEMENTS) {
+            stmt.execute(createTableStatement);
+        }
+
+        stmt.close();
+        conn.close();
+    }
+
+    @AfterAll
+    public static void shutdownDatabase() throws Exception {
+        try {
+            DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + 
";shutdown=true");
+        } catch (Exception ignore) {
+            // Do nothing, this is what happens at Derby shutdown
+        }
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        try {
+            FileUtils.deleteFile(dbLocation, true);
+        } catch (IOException ignore) {
+            // Do nothing, may not have existed
+        }
+        System.clearProperty("derby.stream.error.file");
+    }
+
+    @BeforeEach
+    public void setService() throws InitializationException {
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        DBCPService dbcp = new DBCPServiceSimpleImpl(DB_LOCATION);
+        runner.addControllerService(SERVICE_ID, dbcp);
+
+        final String url = String.format("jdbc:derby:%s;create=false", 
DB_LOCATION);
+        runner.setProperty(dbcp, DATABASE_URL, url);
+        runner.setProperty(dbcp, DB_USER, String.class.getSimpleName());
+        runner.setProperty(dbcp, DB_PASSWORD, String.class.getName());
+        runner.setProperty(dbcp, DB_DRIVERNAME, 
"org.apache.derby.jdbc.EmbeddedDriver");
+        runner.enableControllerService(dbcp);
+    }
+
+    @Test
+    public void testGetSchemaExists() throws Exception {
+        DatabaseTableSchemaRegistry dbSchemaRegistry = new 
DatabaseTableSchemaRegistry();
+        runner.addControllerService("schemaRegistry", dbSchemaRegistry);
+        runner.setProperty(dbSchemaRegistry, 
DatabaseTableSchemaRegistry.DBCP_SERVICE, SERVICE_ID);
+        runner.setProperty(dbSchemaRegistry, 
DatabaseTableSchemaRegistry.SCHEMA_NAME, "SCHEMA1");
+        runner.enableControllerService(dbSchemaRegistry);
+        SchemaIdentifier schemaIdentifier = new 
StandardSchemaIdentifier.Builder()
+                .name("PERSONS")
+                .build();
+        RecordSchema recordSchema = 
dbSchemaRegistry.retrieveSchema(schemaIdentifier);
+        assertNotNull(recordSchema);
+        Optional<RecordField> recordField = recordSchema.getField("ID");
+        assertTrue(recordField.isPresent());
+        assertEquals(RecordFieldType.INT.getDataType(), 
recordField.get().getDataType());
+        recordField = recordSchema.getField("NAME");
+        assertTrue(recordField.isPresent());
+        assertEquals(RecordFieldType.STRING.getDataType(), 
recordField.get().getDataType());
+        recordField = recordSchema.getField("CODE");
+        assertTrue(recordField.isPresent());
+        assertEquals(RecordFieldType.INT.getDataType(), 
recordField.get().getDataType());
+        recordField = recordSchema.getField("DT");
+        assertTrue(recordField.isPresent());
+        assertEquals(RecordFieldType.DATE.getDataType(), 
recordField.get().getDataType());
+        // Get nonexistent field
+        recordField = recordSchema.getField("NOT_A_FIELD");
+        assertFalse(recordField.isPresent());
+    }
+
+    @Test
+    public void testGetSchemaNotExists() throws Exception {
+        DatabaseTableSchemaRegistry dbSchemaRegistry = new 
DatabaseTableSchemaRegistry();
+        runner.addControllerService("schemaRegistry", dbSchemaRegistry);
+        runner.setProperty(dbSchemaRegistry, 
DatabaseTableSchemaRegistry.DBCP_SERVICE, SERVICE_ID);
+        runner.setProperty(dbSchemaRegistry, 
DatabaseTableSchemaRegistry.SCHEMA_NAME, "SCHEMA1");
+        runner.enableControllerService(dbSchemaRegistry);
+        SchemaIdentifier schemaIdentifier = new 
StandardSchemaIdentifier.Builder()
+                .name("NOT_A_TABLE")
+                .build();
+        assertThrows(SchemaNotFoundException.class, () -> 
dbSchemaRegistry.retrieveSchema(schemaIdentifier));
+    }
+
+    private static class DBCPServiceSimpleImpl extends 
AbstractControllerService implements DBCPService {

Review Comment:
   Recommend avoiding the use of `Impl` on class names, even for testing.
   ```suggestion
       private static class SimpleDBCPService extends AbstractControllerService 
implements DBCPService {
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/pom.xml:
##########
@@ -25,6 +25,7 @@
     <modules>
         <module>nifi-dbcp-service</module>
         <module>nifi-hikari-dbcp-service</module>
+        <module>nifi-db-schema-registry-service</module>

Review Comment:
   Although this uses Database Connection Pools, it does not seem like it fits 
in this bundle and NAR. As it relies only on standard service interfaces, it 
should be a small addition as a separate NAR. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to