exceptionfactory commented on code in PR #8042: URL: https://github.com/apache/nifi/pull/8042#discussion_r1415801708
########## nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-db-schema-registry-service/src/main/java/org/apache/nifi/db/schemaregistry/DatabaseTableSchemaRegistry.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.db.schemaregistry; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaField; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +@Tags({"schema", "registry", "database", "table"}) +@CapabilityDescription("Provides a service for generating a record schema from a database table definition. The service is configured " + + "to use a table name and a database connection fetches the table metadata (i.e. table definition) such as column names, data types, " + + "nullability, etc.") +public class DatabaseTableSchemaRegistry extends AbstractControllerService implements SchemaRegistry { + + private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME); + + static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("Database Connection Pooling Service") + .displayName("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain a connection to the database for retrieving table information.") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder() + .name("Catalog Name") + .displayName("Catalog Name") + .description("The name of the catalog used to locate the desired table. This may not apply for the database that you are querying. In this case, leave the field empty. Note that if the " + + "property is set and the database is case-sensitive, the catalog name must match the database's catalog name exactly.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder() + .name("Schema Name") + .displayName("Schema Name") + .description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty. Note that if the " + + "property is set and the database is case-sensitive, the schema name must match the database's schema name exactly. Also notice that if the same table name exists in multiple " + + "schemas and Schema Name is not specified, the service will find those tables and give an error if the different tables have the same column name(s).") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected List<PropertyDescriptor> propDescriptors = Collections.unmodifiableList(Arrays.asList( + DBCP_SERVICE, + CATALOG_NAME, + SCHEMA_NAME + )); + + private volatile DBCPService dbcpService; + private volatile String dbCatalogName; + private volatile String dbSchemaName; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propDescriptors; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + dbCatalogName = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions().getValue(); + dbSchemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions().getValue(); + } + + @Override + public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { + if (schemaIdentifier.getName().isPresent()) { + return retrieveSchemaByName(schemaIdentifier); + } else { + throw new SchemaNotFoundException("This Schema Registry only supports retrieving a schema by name."); + } + } + + @Override + public Set<SchemaField> getSuppliedSchemaFields() { + return schemaFields; + } + + RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { + final Optional<String> schemaName = schemaIdentifier.getName(); + if (schemaName.isEmpty()) { + throw new SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present"); + } + + final String tableName = schemaName.get(); + try { + try (final Connection conn = dbcpService.getConnection()) { + final DatabaseMetaData databaseMetaData = conn.getMetaData(); + return getRecordSchemaFromMetadata(databaseMetaData, tableName); + } + } catch (SQLException sqle) { + throw new IOException("Error retrieving schema for table " + schemaName.get(), sqle); + } + } + + private RecordSchema getRecordSchemaFromMetadata(final DatabaseMetaData databaseMetaData, final String tableName) throws SQLException, SchemaNotFoundException { + try (final ResultSet columnResultSet = databaseMetaData.getColumns(dbCatalogName, dbSchemaName, tableName, "%")) { + + final List<RecordField> recordFields = new ArrayList<>(); + while (columnResultSet.next()) { + recordFields.add(createRecordFieldFromColumn(columnResultSet)); + } + + // If no columns are found, check that the table exists + if (recordFields.isEmpty()) { + checkTableExists(databaseMetaData, tableName); + } + return new SimpleRecordSchema(recordFields); + } + } + + private RecordField createRecordFieldFromColumn(final ResultSet columnResultSet) throws SQLException { + // COLUMN_DEF must be read first to work around Oracle bug, see NIFI-4279 for details + final String defaultValue = columnResultSet.getString("COLUMN_DEF"); + final String columnName = columnResultSet.getString("COLUMN_NAME"); + final int dataType = columnResultSet.getInt("DATA_TYPE"); + final String nullableValue = columnResultSet.getString("IS_NULLABLE"); + final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty(); + return new RecordField( + columnName, + DataTypeUtils.getDataTypeFromSQLTypeValue(dataType), + defaultValue, + isNullable); + } + + private void checkTableExists(final DatabaseMetaData databaseMetaData, final String tableName) throws SchemaNotFoundException, SQLException { + try (final ResultSet tblrs = databaseMetaData.getTables(dbCatalogName, dbSchemaName, tableName, null)) { Review Comment: Recommend renaming this variable for readability. ```suggestion try (final ResultSet tables = databaseMetaData.getTables(dbCatalogName, dbSchemaName, tableName, null)) { ``` ########## nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-db-schema-registry-service/src/test/java/org/apache/nifi/db/schemaregistry/DatabaseTableSchemaRegistryTest.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.db.schemaregistry; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.ConnectionUrlValidator; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.dbcp.DriverClassValidator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.serialization.record.StandardSchemaIdentifier; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.spy; + +public class DatabaseTableSchemaRegistryTest { + + private static final List<String> CREATE_TABLE_STATEMENTS = Arrays.asList( + "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," + + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)", + "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name varchar(100)," + + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)", + "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name varchar(100)," + + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)", + "CREATE TABLE UUID_TEST (id integer primary key, name VARCHAR(100))", + "CREATE TABLE LONGVARBINARY_TEST (id integer primary key, name LONG VARCHAR FOR BIT DATA)" + ); + + private static final String SERVICE_ID = DBCPServiceSimpleImpl.class.getName(); + + private final static String DB_LOCATION = "target/db_schema_reg"; + + // This is to mimic those in DBCPProperties to avoid adding the dependency to nifi-dbcp-base + private static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder() + .name("Database Connection URL") + .addValidator(new ConnectionUrlValidator()) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + private static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder() + .name("Database User") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + private static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + private static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder() + .name("Database Driver Class Name") + .addValidator(new DriverClassValidator()) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + private TestRunner runner; + + @BeforeAll + public static void setupDatabase() throws Exception { + System.setProperty("derby.stream.error.file", "target/derby.log"); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ignore) { + // Do nothing, may not have existed + } + + // Mock the DBCP Controller Service so we can control the Results + DBCPService setupDbService = spy(new DBCPServiceSimpleImpl(DB_LOCATION)); + + final Connection conn = setupDbService.getConnection(); + final Statement stmt = conn.createStatement(); + for (String createTableStatement : CREATE_TABLE_STATEMENTS) { + stmt.execute(createTableStatement); + } + + stmt.close(); + conn.close(); + } + + @AfterAll + public static void shutdownDatabase() throws Exception { Review Comment: The `Exception` is not thrown and can be removed: ```suggestion public static void shutdownDatabase() { ``` ########## nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-db-schema-registry-service/src/test/java/org/apache/nifi/db/schemaregistry/DatabaseTableSchemaRegistryTest.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.db.schemaregistry; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.ConnectionUrlValidator; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.dbcp.DriverClassValidator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.serialization.record.StandardSchemaIdentifier; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.spy; + +public class DatabaseTableSchemaRegistryTest { + + private static final List<String> CREATE_TABLE_STATEMENTS = Arrays.asList( + "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," + + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)", + "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name varchar(100)," + + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)", + "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name varchar(100)," + + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)", + "CREATE TABLE UUID_TEST (id integer primary key, name VARCHAR(100))", + "CREATE TABLE LONGVARBINARY_TEST (id integer primary key, name LONG VARCHAR FOR BIT DATA)" + ); + + private static final String SERVICE_ID = DBCPServiceSimpleImpl.class.getName(); + + private final static String DB_LOCATION = "target/db_schema_reg"; + + // This is to mimic those in DBCPProperties to avoid adding the dependency to nifi-dbcp-base + private static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder() + .name("Database Connection URL") + .addValidator(new ConnectionUrlValidator()) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + private static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder() + .name("Database User") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + private static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + private static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder() + .name("Database Driver Class Name") + .addValidator(new DriverClassValidator()) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + private TestRunner runner; + + @BeforeAll + public static void setupDatabase() throws Exception { + System.setProperty("derby.stream.error.file", "target/derby.log"); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ignore) { + // Do nothing, may not have existed + } + + // Mock the DBCP Controller Service so we can control the Results + DBCPService setupDbService = spy(new DBCPServiceSimpleImpl(DB_LOCATION)); + + final Connection conn = setupDbService.getConnection(); + final Statement stmt = conn.createStatement(); + for (String createTableStatement : CREATE_TABLE_STATEMENTS) { + stmt.execute(createTableStatement); + } + + stmt.close(); + conn.close(); + } + + @AfterAll + public static void shutdownDatabase() throws Exception { + try { + DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true"); + } catch (Exception ignore) { + // Do nothing, this is what happens at Derby shutdown + } + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ignore) { + // Do nothing, may not have existed + } + System.clearProperty("derby.stream.error.file"); + } + + @BeforeEach + public void setService() throws InitializationException { + runner = TestRunners.newTestRunner(NoOpProcessor.class); + DBCPService dbcp = new DBCPServiceSimpleImpl(DB_LOCATION); + runner.addControllerService(SERVICE_ID, dbcp); + + final String url = String.format("jdbc:derby:%s;create=false", DB_LOCATION); + runner.setProperty(dbcp, DATABASE_URL, url); + runner.setProperty(dbcp, DB_USER, String.class.getSimpleName()); + runner.setProperty(dbcp, DB_PASSWORD, String.class.getName()); + runner.setProperty(dbcp, DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver"); + runner.enableControllerService(dbcp); + } + + @Test + public void testGetSchemaExists() throws Exception { + DatabaseTableSchemaRegistry dbSchemaRegistry = new DatabaseTableSchemaRegistry(); + runner.addControllerService("schemaRegistry", dbSchemaRegistry); + runner.setProperty(dbSchemaRegistry, DatabaseTableSchemaRegistry.DBCP_SERVICE, SERVICE_ID); + runner.setProperty(dbSchemaRegistry, DatabaseTableSchemaRegistry.SCHEMA_NAME, "SCHEMA1"); + runner.enableControllerService(dbSchemaRegistry); + SchemaIdentifier schemaIdentifier = new StandardSchemaIdentifier.Builder() + .name("PERSONS") + .build(); + RecordSchema recordSchema = dbSchemaRegistry.retrieveSchema(schemaIdentifier); + assertNotNull(recordSchema); + Optional<RecordField> recordField = recordSchema.getField("ID"); + assertTrue(recordField.isPresent()); + assertEquals(RecordFieldType.INT.getDataType(), recordField.get().getDataType()); + recordField = recordSchema.getField("NAME"); + assertTrue(recordField.isPresent()); + assertEquals(RecordFieldType.STRING.getDataType(), recordField.get().getDataType()); + recordField = recordSchema.getField("CODE"); + assertTrue(recordField.isPresent()); + assertEquals(RecordFieldType.INT.getDataType(), recordField.get().getDataType()); + recordField = recordSchema.getField("DT"); + assertTrue(recordField.isPresent()); + assertEquals(RecordFieldType.DATE.getDataType(), recordField.get().getDataType()); + // Get nonexistent field + recordField = recordSchema.getField("NOT_A_FIELD"); + assertFalse(recordField.isPresent()); + } + + @Test + public void testGetSchemaNotExists() throws Exception { + DatabaseTableSchemaRegistry dbSchemaRegistry = new DatabaseTableSchemaRegistry(); + runner.addControllerService("schemaRegistry", dbSchemaRegistry); + runner.setProperty(dbSchemaRegistry, DatabaseTableSchemaRegistry.DBCP_SERVICE, SERVICE_ID); + runner.setProperty(dbSchemaRegistry, DatabaseTableSchemaRegistry.SCHEMA_NAME, "SCHEMA1"); + runner.enableControllerService(dbSchemaRegistry); + SchemaIdentifier schemaIdentifier = new StandardSchemaIdentifier.Builder() + .name("NOT_A_TABLE") + .build(); + assertThrows(SchemaNotFoundException.class, () -> dbSchemaRegistry.retrieveSchema(schemaIdentifier)); + } + + private static class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { + + private final String databaseLocation; + + public DBCPServiceSimpleImpl(final String databaseLocation) { + this.databaseLocation = databaseLocation; + } + + @Override + public String getIdentifier() { + return this.getClass().getName(); + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); Review Comment: Recommend declaring this class name once and reusing throughout the test class. ########## nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-db-schema-registry-service/src/test/java/org/apache/nifi/db/schemaregistry/DatabaseTableSchemaRegistryTest.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.db.schemaregistry; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.ConnectionUrlValidator; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.dbcp.DriverClassValidator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.serialization.record.StandardSchemaIdentifier; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.spy; + +public class DatabaseTableSchemaRegistryTest { + + private static final List<String> CREATE_TABLE_STATEMENTS = Arrays.asList( + "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," + + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)", + "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name varchar(100)," + + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)", + "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name varchar(100)," + + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)", + "CREATE TABLE UUID_TEST (id integer primary key, name VARCHAR(100))", + "CREATE TABLE LONGVARBINARY_TEST (id integer primary key, name LONG VARCHAR FOR BIT DATA)" + ); + + private static final String SERVICE_ID = DBCPServiceSimpleImpl.class.getName(); + + private final static String DB_LOCATION = "target/db_schema_reg"; + + // This is to mimic those in DBCPProperties to avoid adding the dependency to nifi-dbcp-base + private static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder() + .name("Database Connection URL") + .addValidator(new ConnectionUrlValidator()) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + private static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder() + .name("Database User") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + private static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + private static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder() + .name("Database Driver Class Name") + .addValidator(new DriverClassValidator()) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + private TestRunner runner; + + @BeforeAll + public static void setupDatabase() throws Exception { + System.setProperty("derby.stream.error.file", "target/derby.log"); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ignore) { + // Do nothing, may not have existed + } + + // Mock the DBCP Controller Service so we can control the Results + DBCPService setupDbService = spy(new DBCPServiceSimpleImpl(DB_LOCATION)); + + final Connection conn = setupDbService.getConnection(); + final Statement stmt = conn.createStatement(); + for (String createTableStatement : CREATE_TABLE_STATEMENTS) { + stmt.execute(createTableStatement); + } + + stmt.close(); + conn.close(); + } + + @AfterAll + public static void shutdownDatabase() throws Exception { + try { + DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true"); + } catch (Exception ignore) { + // Do nothing, this is what happens at Derby shutdown + } + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ignore) { + // Do nothing, may not have existed + } + System.clearProperty("derby.stream.error.file"); + } + + @BeforeEach + public void setService() throws InitializationException { + runner = TestRunners.newTestRunner(NoOpProcessor.class); + DBCPService dbcp = new DBCPServiceSimpleImpl(DB_LOCATION); + runner.addControllerService(SERVICE_ID, dbcp); + + final String url = String.format("jdbc:derby:%s;create=false", DB_LOCATION); + runner.setProperty(dbcp, DATABASE_URL, url); + runner.setProperty(dbcp, DB_USER, String.class.getSimpleName()); + runner.setProperty(dbcp, DB_PASSWORD, String.class.getName()); + runner.setProperty(dbcp, DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver"); + runner.enableControllerService(dbcp); + } + + @Test + public void testGetSchemaExists() throws Exception { + DatabaseTableSchemaRegistry dbSchemaRegistry = new DatabaseTableSchemaRegistry(); + runner.addControllerService("schemaRegistry", dbSchemaRegistry); + runner.setProperty(dbSchemaRegistry, DatabaseTableSchemaRegistry.DBCP_SERVICE, SERVICE_ID); + runner.setProperty(dbSchemaRegistry, DatabaseTableSchemaRegistry.SCHEMA_NAME, "SCHEMA1"); + runner.enableControllerService(dbSchemaRegistry); + SchemaIdentifier schemaIdentifier = new StandardSchemaIdentifier.Builder() + .name("PERSONS") + .build(); + RecordSchema recordSchema = dbSchemaRegistry.retrieveSchema(schemaIdentifier); + assertNotNull(recordSchema); + Optional<RecordField> recordField = recordSchema.getField("ID"); + assertTrue(recordField.isPresent()); + assertEquals(RecordFieldType.INT.getDataType(), recordField.get().getDataType()); + recordField = recordSchema.getField("NAME"); + assertTrue(recordField.isPresent()); + assertEquals(RecordFieldType.STRING.getDataType(), recordField.get().getDataType()); + recordField = recordSchema.getField("CODE"); + assertTrue(recordField.isPresent()); + assertEquals(RecordFieldType.INT.getDataType(), recordField.get().getDataType()); + recordField = recordSchema.getField("DT"); + assertTrue(recordField.isPresent()); + assertEquals(RecordFieldType.DATE.getDataType(), recordField.get().getDataType()); + // Get nonexistent field + recordField = recordSchema.getField("NOT_A_FIELD"); + assertFalse(recordField.isPresent()); + } + + @Test + public void testGetSchemaNotExists() throws Exception { + DatabaseTableSchemaRegistry dbSchemaRegistry = new DatabaseTableSchemaRegistry(); + runner.addControllerService("schemaRegistry", dbSchemaRegistry); + runner.setProperty(dbSchemaRegistry, DatabaseTableSchemaRegistry.DBCP_SERVICE, SERVICE_ID); + runner.setProperty(dbSchemaRegistry, DatabaseTableSchemaRegistry.SCHEMA_NAME, "SCHEMA1"); + runner.enableControllerService(dbSchemaRegistry); + SchemaIdentifier schemaIdentifier = new StandardSchemaIdentifier.Builder() + .name("NOT_A_TABLE") + .build(); + assertThrows(SchemaNotFoundException.class, () -> dbSchemaRegistry.retrieveSchema(schemaIdentifier)); + } + + private static class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { Review Comment: Recommend avoiding the use of `Impl` on class names, even for testing. ```suggestion private static class SimpleDBCPService extends AbstractControllerService implements DBCPService { ``` ########## nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/pom.xml: ########## @@ -25,6 +25,7 @@ <modules> <module>nifi-dbcp-service</module> <module>nifi-hikari-dbcp-service</module> + <module>nifi-db-schema-registry-service</module> Review Comment: Although this uses Database Connection Pools, it does not seem like it fits in this bundle and NAR. As it relies only on standard service interfaces, it should be a small addition as a separate NAR. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
