http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java new file mode 100644 index 0000000..aeb2755 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin; + +import java.io.File; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.commons.lang3.StringUtils; +import org.h2.jdbcx.JdbcConnectionPool; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.FactoryBean; + +/** + * + */ +public class AuditDataSourceFactoryBean implements FactoryBean { + + private static final Logger logger = LoggerFactory.getLogger(AuditDataSourceFactoryBean.class); + private static final String NF_USERNAME_PASSWORD = "nf"; + private static final int MAX_CONNECTIONS = 5; + + // database file name + private static final String AUDIT_DATABASE_FILE_NAME = "nifi-audit"; + + // ------------ + // action table + // ------------ + private static final String CREATE_ACTION_TABLE = "CREATE TABLE ACTION (" + + "ID INT NOT NULL PRIMARY KEY AUTO_INCREMENT, " + + "USER_DN VARCHAR2(255) NOT NULL, " + + "USER_NAME VARCHAR2(100) NOT NULL, " + + "SOURCE_ID VARCHAR2(100) NOT NULL, " + + "SOURCE_NAME VARCHAR2(1000) NOT NULL, " + + "SOURCE_TYPE VARCHAR2(1000) NOT NULL, " + + "OPERATION VARCHAR2(50) NOT NULL, " + + "ACTION_TIMESTAMP TIMESTAMP NOT NULL " + + ")"; + + // ----------------- + // component details + // ----------------- + private static final String CREATE_PROCESSOR_DETAILS_TABLE = "CREATE TABLE PROCESSOR_DETAILS (" + + "ACTION_ID INT NOT NULL PRIMARY KEY, " + + "TYPE VARCHAR2(1000) NOT NULL, " + + "FOREIGN KEY (ACTION_ID) REFERENCES ACTION(ID)" + + ")"; + + private static final String CREATE_REMOTE_PROCESS_GROUP_DETAILS_TABLE = "CREATE TABLE REMOTE_PROCESS_GROUP_DETAILS (" + + "ACTION_ID INT NOT NULL PRIMARY KEY, " + + "URI VARCHAR2(2500) NOT NULL, " + + "FOREIGN KEY (ACTION_ID) REFERENCES ACTION(ID)" + + ")"; + + // -------------- + // action details + // -------------- + private static final String CREATE_MOVE_DETAILS_TABLE = "CREATE TABLE MOVE_DETAILS (" + + "ACTION_ID INT NOT NULL PRIMARY KEY, " + + "GROUP_ID VARCHAR2(100) NOT NULL, " + + "GROUP_NAME VARCHAR2(1000) NOT NULL, " + + "PREVIOUS_GROUP_ID VARCHAR2(100) NOT NULL, " + + "PREVIOUS_GROUP_NAME VARCHAR2(1000) NOT NULL, " + + "FOREIGN KEY (ACTION_ID) REFERENCES ACTION(ID)" + + ")"; + + private static final String CREATE_CONFIGURE_DETAILS_TABLE = "CREATE TABLE CONFIGURE_DETAILS (" + + "ACTION_ID INT NOT NULL PRIMARY KEY, " + + "NAME VARCHAR2(1000) NOT NULL, " + + "VALUE VARCHAR2(5000), " + + "PREVIOUS_VALUE VARCHAR2(5000), " + + "FOREIGN KEY (ACTION_ID) REFERENCES ACTION(ID)" + + ")"; + + private static final String CREATE_CONNECT_DETAILS_TABLE = "CREATE TABLE CONNECT_DETAILS (" + + "ACTION_ID INT NOT NULL PRIMARY KEY, " + + "SOURCE_ID VARCHAR2(100) NOT NULL, " + + "SOURCE_NAME VARCHAR2(1000), " + + "SOURCE_TYPE VARCHAR2(1000) NOT NULL, " + + "RELATIONSHIP VARCHAR2(1000), " + + "DESTINATION_ID VARCHAR2(100) NOT NULL, " + + "DESTINATION_NAME VARCHAR2(1000), " + + "DESTINATION_TYPE VARCHAR2(1000) NOT NULL, " + + "FOREIGN KEY (ACTION_ID) REFERENCES ACTION(ID)" + + ")"; + + private static final String CREATE_PURGE_DETAILS_TABLE = "CREATE TABLE PURGE_DETAILS (" + + "ACTION_ID INT NOT NULL PRIMARY KEY, " + + "END_DATE TIMESTAMP NOT NULL, " + + "FOREIGN KEY (ACTION_ID) REFERENCES ACTION(ID)" + + ")"; + + private JdbcConnectionPool connectionPool; + + private NiFiProperties properties; + + @Override + public Object getObject() throws Exception { + if (connectionPool == null) { + + // locate the repository directory + String repositoryDirectoryPath = properties.getProperty(NiFiProperties.REPOSITORY_DATABASE_DIRECTORY); + + // ensure the repository directory is specified + if (repositoryDirectoryPath == null) { + throw new NullPointerException("Database directory must be specified."); + } + + // create a handle to the repository directory + File repositoryDirectory = new File(repositoryDirectoryPath); + + // get a handle to the database file + File databaseFile = new File(repositoryDirectory, AUDIT_DATABASE_FILE_NAME); + + // format the database url + String databaseUrl = "jdbc:h2:" + databaseFile + ";AUTOCOMMIT=OFF;DB_CLOSE_ON_EXIT=FALSE;LOCK_MODE=3"; + String databaseUrlAppend = properties.getProperty(NiFiProperties.H2_URL_APPEND); + if (StringUtils.isNotBlank(databaseUrlAppend)) { + databaseUrl += databaseUrlAppend; + } + + // create the pool + connectionPool = JdbcConnectionPool.create(databaseUrl, NF_USERNAME_PASSWORD, NF_USERNAME_PASSWORD); + connectionPool.setMaxConnections(MAX_CONNECTIONS); + + Connection connection = null; + ResultSet rs = null; + Statement statement = null; + try { + // get a connection + connection = connectionPool.getConnection(); + connection.setAutoCommit(false); + + // determine if the tables need to be created + rs = connection.getMetaData().getTables(null, null, "ACTION", null); + if (!rs.next()) { + logger.info("Database not built for repository: " + databaseUrl + ". Building now..."); + RepositoryUtils.closeQuietly(rs); + + // create a statement for initializing the database + statement = connection.createStatement(); + + // action table + statement.execute(CREATE_ACTION_TABLE); + + // component details + statement.execute(CREATE_PROCESSOR_DETAILS_TABLE); + statement.execute(CREATE_REMOTE_PROCESS_GROUP_DETAILS_TABLE); + + // action details + statement.execute(CREATE_MOVE_DETAILS_TABLE); + statement.execute(CREATE_CONFIGURE_DETAILS_TABLE); + statement.execute(CREATE_CONNECT_DETAILS_TABLE); + statement.execute(CREATE_PURGE_DETAILS_TABLE); + } else { + logger.info("Existing database found and connected to at: " + databaseUrl); + } + + // commit any changes + connection.commit(); + } catch (SQLException sqle) { + RepositoryUtils.rollback(connection, logger); + throw sqle; + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + RepositoryUtils.closeQuietly(connection); + } + } + + return connectionPool; + } + + @Override + public Class getObjectType() { + return JdbcConnectionPool.class; + } + + @Override + public boolean isSingleton() { + return true; + } + + public void setProperties(NiFiProperties properties) { + this.properties = properties; + } + + /** + * Disposes resources. + */ + public void shutdown() { + + // shutdown the connection pool + if (connectionPool != null) { + try { + connectionPool.dispose(); + } catch (Exception e) { + logger.warn("Unable to dispose of connection pool: " + e.getMessage()); + if (logger.isDebugEnabled()) { + logger.warn(StringUtils.EMPTY, e); + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/RepositoryUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/RepositoryUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/RepositoryUtils.java new file mode 100644 index 0000000..f678d5d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/RepositoryUtils.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.slf4j.Logger; + +/** + * A utility class for useful methods dealing with the repository + * + */ +public class RepositoryUtils { + + public static void rollback(final Connection conn, final Logger logger) { + try { + if (null != conn) { + conn.rollback(); + } + } catch (final SQLException sqe) { + logger.warn("The following problem occurred while trying to rollback " + conn + ": " + sqe.getLocalizedMessage()); + if (logger.isDebugEnabled()) { + logger.debug("", sqe); + } + } + } + + /** + * Closes the given statement quietly - no logging, no exceptions + * + * @param statement to close + */ + public static void closeQuietly(final Statement statement) { + + if (null != statement) { + try { + statement.close(); + } catch (final SQLException se) { /*IGNORE*/ + + } + } + } + + /** + * Closes the given result set quietly - no logging, no exceptions + * + * @param resultSet to close + */ + public static void closeQuietly(final ResultSet resultSet) { + if (null != resultSet) { + try { + resultSet.close(); + } catch (final SQLException se) {/*IGNORE*/ + + } + } + } + + /** + * Closes the given connection quietly - no logging, no exceptions + * + * @param conn to close + */ + public static void closeQuietly(final Connection conn) { + if (null != conn) { + try { + conn.close(); + } catch (final SQLException se) {/*IGNORE*/ + + } + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/UserDataSourceFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/UserDataSourceFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/UserDataSourceFactoryBean.java new file mode 100644 index 0000000..ebcf574 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/UserDataSourceFactoryBean.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin; + +import java.io.File; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.UUID; +import org.apache.commons.lang3.StringUtils; +import org.h2.jdbcx.JdbcConnectionPool; +import org.apache.nifi.user.NiFiUser; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.FactoryBean; + +public class UserDataSourceFactoryBean implements FactoryBean { + + private static final Logger logger = LoggerFactory.getLogger(UserDataSourceFactoryBean.class); + private static final String NF_USERNAME_PASSWORD = "nf"; + private static final int MAX_CONNECTIONS = 5; + + // database file name + private static final String AUDIT_DATABASE_FILE_NAME = "nifi-users"; + + private static final String CREATE_USER_TABLE = "CREATE TABLE USER (" + + "ID VARCHAR2(100) NOT NULL PRIMARY KEY, " + + "DN VARCHAR2(255) NOT NULL UNIQUE, " + + "USER_NAME VARCHAR2(100) NOT NULL, " + + "USER_GROUP VARCHAR2(100), " + + "CREATION TIMESTAMP NOT NULL, " + + "LAST_ACCESSED TIMESTAMP, " + + "LAST_VERIFIED TIMESTAMP, " + + "JUSTIFICATION VARCHAR2(500) NOT NULL, " + + "STATUS VARCHAR2(10) NOT NULL" + + ")"; + + private static final String CREATE_AUTHORITY_TABLE = "CREATE TABLE AUTHORITY (" + + "ID INT NOT NULL PRIMARY KEY AUTO_INCREMENT, " + + "USER_ID VARCHAR2(100) NOT NULL, " + + "ROLE VARCHAR2(50) NOT NULL, " + + "FOREIGN KEY (USER_ID) REFERENCES USER (ID), " + + "CONSTRAINT USER_ROLE_UNIQUE_CONSTRAINT UNIQUE (USER_ID, ROLE)" + + ")"; + + private static final String INSERT_ANONYMOUS_USER = "INSERT INTO USER (" + + "ID, DN, USER_NAME, CREATION, LAST_VERIFIED, JUSTIFICATION, STATUS" + + ") VALUES (" + + "'" + UUID.randomUUID().toString() + "', " + + "'" + NiFiUser.ANONYMOUS_USER_DN + "', " + + "'" + NiFiUser.ANONYMOUS_USER_DN + "', " + + "NOW(), " + + "NOW(), " + + "'Anonymous user needs no justification', " + + "'ACTIVE'" + + ")"; + + private static final String INSERT_ANONYMOUS_MONITOR_AUTHORITY = "INSERT INTO AUTHORITY (" + + "USER_ID, ROLE" + + ") VALUES (" + + "(SELECT ID FROM USER WHERE DN = '" + NiFiUser.ANONYMOUS_USER_DN + "'), " + + "'ROLE_MONITOR'" + + ")"; + + private static final String INSERT_ANONYMOUS_DFM_AUTHORITY = "INSERT INTO AUTHORITY (" + + "USER_ID, ROLE" + + ") VALUES (" + + "(SELECT ID FROM USER WHERE DN = '" + NiFiUser.ANONYMOUS_USER_DN + "'), " + + "'ROLE_DFM'" + + ")"; + + private static final String INSERT_ANONYMOUS_ADMIN_AUTHORITY = "INSERT INTO AUTHORITY (" + + "USER_ID, ROLE" + + ") VALUES (" + + "(SELECT ID FROM USER WHERE DN = '" + NiFiUser.ANONYMOUS_USER_DN + "'), " + + "'ROLE_ADMIN'" + + ")"; + + private static final String INSERT_ANONYMOUS_NIFI_AUTHORITY = "INSERT INTO AUTHORITY (" + + "USER_ID, ROLE" + + ") VALUES (" + + "(SELECT ID FROM USER WHERE DN = '" + NiFiUser.ANONYMOUS_USER_DN + "'), " + + "'ROLE_NIFI'" + + ")"; + + private static final String INSERT_ANONYMOUS_PROVENANCE_AUTHORITY = "INSERT INTO AUTHORITY (" + + "USER_ID, ROLE" + + ") VALUES (" + + "(SELECT ID FROM USER WHERE DN = '" + NiFiUser.ANONYMOUS_USER_DN + "'), " + + "'ROLE_PROVENANCE'" + + ")"; + + private static final String SELECT_ANONYMOUS_PROVENANCE_AUTHORITY = "SELECT * FROM AUTHORITY " + + "WHERE " + + "USER_ID = (SELECT ID FROM USER WHERE DN = '" + NiFiUser.ANONYMOUS_USER_DN + "') " + + "AND " + + "ROLE = 'ROLE_PROVENANCE'"; + + private JdbcConnectionPool connectionPool; + + private NiFiProperties properties; + + @Override + public Object getObject() throws Exception { + if (connectionPool == null) { + + // locate the repository directory + String repositoryDirectoryPath = properties.getProperty(NiFiProperties.REPOSITORY_DATABASE_DIRECTORY); + + // ensure the repository directory is specified + if (repositoryDirectoryPath == null) { + throw new NullPointerException("Database directory must be specified."); + } + + // create a handle to the repository directory + File repositoryDirectory = new File(repositoryDirectoryPath); + + // create a handle to the database directory and file + File databaseFile = new File(repositoryDirectory, AUDIT_DATABASE_FILE_NAME); + String databaseUrl = getDatabaseUrl(databaseFile); + + // create the pool + connectionPool = JdbcConnectionPool.create(databaseUrl, NF_USERNAME_PASSWORD, NF_USERNAME_PASSWORD); + connectionPool.setMaxConnections(MAX_CONNECTIONS); + + Connection connection = null; + ResultSet rs = null; + Statement statement = null; + try { + // get a connection + connection = connectionPool.getConnection(); + connection.setAutoCommit(false); + + // create a statement for creating/updating the database + statement = connection.createStatement(); + + // determine if the tables need to be created + rs = connection.getMetaData().getTables(null, null, "USER", null); + if (!rs.next()) { + logger.info("Database not built for repository: " + databaseUrl + ". Building now..."); + + // create the tables + statement.execute(CREATE_USER_TABLE); + statement.execute(CREATE_AUTHORITY_TABLE); + + // seed the anonymous user + statement.execute(INSERT_ANONYMOUS_USER); + statement.execute(INSERT_ANONYMOUS_MONITOR_AUTHORITY); + statement.execute(INSERT_ANONYMOUS_DFM_AUTHORITY); + statement.execute(INSERT_ANONYMOUS_ADMIN_AUTHORITY); + statement.execute(INSERT_ANONYMOUS_NIFI_AUTHORITY); + } else { + logger.info("Existing database found and connected to at: " + databaseUrl); + } + + // close the previous result set + RepositoryUtils.closeQuietly(rs); + + // merge in the provenance role to handle existing databases + rs = statement.executeQuery(SELECT_ANONYMOUS_PROVENANCE_AUTHORITY); + if (!rs.next()) { + statement.execute(INSERT_ANONYMOUS_PROVENANCE_AUTHORITY); + } + + // commit any changes + connection.commit(); + } catch (SQLException sqle) { + RepositoryUtils.rollback(connection, logger); + throw sqle; + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + RepositoryUtils.closeQuietly(connection); + } + } + + return connectionPool; + } + + private String getDatabaseUrl(File databaseFile) { + String databaseUrl = "jdbc:h2:" + databaseFile + ";AUTOCOMMIT=OFF;DB_CLOSE_ON_EXIT=FALSE;LOCK_MODE=3"; + String databaseUrlAppend = properties.getProperty(NiFiProperties.H2_URL_APPEND); + if (StringUtils.isNotBlank(databaseUrlAppend)) { + databaseUrl += databaseUrlAppend; + } + return databaseUrl; + } + + @Override + public Class getObjectType() { + return JdbcConnectionPool.class; + } + + @Override + public boolean isSingleton() { + return true; + } + + public void setProperties(NiFiProperties properties) { + this.properties = properties; + } + + public void shutdown() { + + // shutdown the connection pool + if (connectionPool != null) { + try { + connectionPool.dispose(); + } catch (Exception e) { + logger.warn("Unable to dispose of connection pool: " + e.getMessage()); + if (logger.isDebugEnabled()) { + logger.warn(StringUtils.EMPTY, e); + } + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/ActionDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/ActionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/ActionDAO.java new file mode 100644 index 0000000..925dc80 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/ActionDAO.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.dao; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import org.apache.nifi.action.Action; +import org.apache.nifi.history.HistoryQuery; +import org.apache.nifi.history.History; +import org.apache.nifi.history.PreviousValue; + +/** + * Action data access. + */ +public interface ActionDAO { + + /** + * Persists the specified action. + * + * @param action to persist + * @throws DataAccessException if unable to persist + */ + void createAction(Action action) throws DataAccessException; + + /** + * Finds all actions that meet the specified criteria. + * + * @param actionQuery query for actions + * @return History of actions + * @throws DataAccessException dae + */ + History findActions(HistoryQuery actionQuery) throws DataAccessException; + + /** + * @param componentId to get previous values of + * @return Finds the previous values for the specified property in the + * specified component. Returns empty list if there are none + */ + Map<String, List<PreviousValue>> getPreviousValues(String componentId); + + /** + * Finds the specified action. + * + * @param actionId action identifier + * @return Action specified + * @throws DataAccessException dae + */ + Action getAction(Integer actionId) throws DataAccessException; + + /** + * Deletes all actions up to the specified end date. + * + * @param endDate date to stop deleting at + * @throws DataAccessException dae + */ + void deleteActions(Date endDate) throws DataAccessException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/AuthorityDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/AuthorityDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/AuthorityDAO.java new file mode 100644 index 0000000..b80b78e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/AuthorityDAO.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.dao; + +import java.util.Set; +import org.apache.nifi.authorization.Authority; + +/** + * Authority data access. + */ +public interface AuthorityDAO { + + /** + * Finds all Authority for the specified user. + * + * @param userId identifier of user + * @return authorities + */ + Set<Authority> findAuthoritiesByUserId(String userId) throws DataAccessException; + + /** + * Creates new Authorities for the specified user in addition to authorities + * they already have. + * + * @param authorities to add to the given user + * @param userId identifier of user + */ + void createAuthorities(Set<Authority> authorities, String userId) throws DataAccessException; + + /** + * Removes all Authorities for the specified user. + * + * @param userId user identifier + * @throws DataAccessException if unable to access authorities + */ + void deleteAuthorities(String userId) throws DataAccessException; + + /** + * Removes the specified Authority. + * + * @param authorities to remove + * @param userId user id + */ + void deleteAuthorities(Set<Authority> authorities, String userId) throws DataAccessException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/DAOFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/DAOFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/DAOFactory.java new file mode 100644 index 0000000..dee4ef9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/DAOFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.dao; + +/** + * + */ +public interface DAOFactory { + + UserDAO getUserDAO(); + + ActionDAO getActionDAO(); + + AuthorityDAO getAuthorityDAO(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/DataAccessException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/DataAccessException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/DataAccessException.java new file mode 100644 index 0000000..05bf4af --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/DataAccessException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.dao; + +/** + * Represents any error that might occur while administering NiFi accounts. + */ +public class DataAccessException extends RuntimeException { + + public DataAccessException(Throwable cause) { + super(cause); + } + + public DataAccessException(String message, Throwable cause) { + super(message, cause); + } + + public DataAccessException(String message) { + super(message); + } + + public DataAccessException() { + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/UserDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/UserDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/UserDAO.java new file mode 100644 index 0000000..6339e5a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/UserDAO.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.dao; + +import java.util.Date; +import java.util.Set; +import org.apache.nifi.user.AccountStatus; +import org.apache.nifi.user.NiFiUser; + +/** + * Defines the user data access object. + */ +public interface UserDAO { + + /** + * Determines whether there are any PENDING user accounts. + * + * @return true if pending + * @throws DataAccessException dae + */ + Boolean hasPendingUserAccounts() throws DataAccessException; + + /** + * Returns all users. + * + * @return all users + * @throws DataAccessException dae + */ + Set<NiFiUser> findUsers() throws DataAccessException; + + /** + * Returns all user groups. + * + * @return all group names + * @throws DataAccessException dae + */ + Set<String> findUserGroups() throws DataAccessException; + + /** + * Returns all users for the specified group. + * + * @param group group + * @return users in group + * @throws DataAccessException dae + */ + Set<NiFiUser> findUsersForGroup(String group) throws DataAccessException; + + /** + * Returns the user with the specified id. + * + * @param id user id + * @return user for the given id + * @throws DataAccessException dae + */ + NiFiUser findUserById(String id) throws DataAccessException; + + /** + * Returns the user with the specified DN. + * + * @param dn user dn + * @return user + */ + NiFiUser findUserByDn(String dn) throws DataAccessException; + + /** + * Creates a new user based off the specified NiFiUser. + * + * @param user to create + */ + void createUser(NiFiUser user) throws DataAccessException; + + /** + * Updates the specified NiFiUser. + * + * @param user to update + */ + void updateUser(NiFiUser user) throws DataAccessException; + + /** + * Deletes the specified user. + * + * @param id user identifier + * @throws DataAccessException dae + */ + void deleteUser(String id) throws DataAccessException; + + /** + * Sets the status of the specified group. + * + * @param group group + * @param status status + * @throws DataAccessException dae + */ + void updateGroupStatus(String group, AccountStatus status) throws DataAccessException; + + /** + * Sets the last verified time for all users in the specified group. + * + * @param group group + * @param lastVerified date last verified + * @throws DataAccessException dae + */ + void updateGroupVerification(String group, Date lastVerified) throws DataAccessException; + + /** + * Ungroups the specified group. + * + * @param group to ungroup + * @throws DataAccessException dae + */ + void ungroup(String group) throws DataAccessException; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/impl/DAOFactoryImpl.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/impl/DAOFactoryImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/impl/DAOFactoryImpl.java new file mode 100644 index 0000000..2f3de0e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/impl/DAOFactoryImpl.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.dao.impl; + +import java.sql.Connection; +import org.apache.nifi.admin.dao.ActionDAO; +import org.apache.nifi.admin.dao.AuthorityDAO; +import org.apache.nifi.admin.dao.DAOFactory; +import org.apache.nifi.admin.dao.UserDAO; + +/** + * + */ +public class DAOFactoryImpl implements DAOFactory { + + private final Connection connection; + + public DAOFactoryImpl(Connection connection) { + this.connection = connection; + } + + @Override + public ActionDAO getActionDAO() { + return new StandardActionDAO(connection); + } + + @Override + public AuthorityDAO getAuthorityDAO() { + return new StandardAuthorityDAO(connection); + } + + @Override + public UserDAO getUserDAO() { + return new StandardUserDAO(connection); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardActionDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardActionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardActionDAO.java new file mode 100644 index 0000000..bb655eb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardActionDAO.java @@ -0,0 +1,965 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.dao.impl; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.nifi.action.Action; +import org.apache.nifi.action.Component; +import org.apache.nifi.action.Operation; +import org.apache.nifi.action.component.details.ComponentDetails; +import org.apache.nifi.action.component.details.ExtensionDetails; +import org.apache.nifi.action.component.details.RemoteProcessGroupDetails; +import org.apache.nifi.action.details.ActionDetails; +import org.apache.nifi.action.details.ConfigureDetails; +import org.apache.nifi.action.details.ConnectDetails; +import org.apache.nifi.action.details.MoveDetails; +import org.apache.nifi.action.details.PurgeDetails; +import org.apache.nifi.admin.RepositoryUtils; +import org.apache.nifi.admin.dao.ActionDAO; +import org.apache.nifi.admin.dao.DataAccessException; +import org.apache.nifi.history.History; +import org.apache.nifi.history.HistoryQuery; +import org.apache.nifi.history.PreviousValue; +import org.apache.commons.lang3.StringUtils; + +/** + * + */ +public class StandardActionDAO implements ActionDAO { + + // ------------ + // action table + // ------------ + private static final String INSERT_ACTION = "INSERT INTO ACTION (" + + "USER_DN, USER_NAME, SOURCE_ID, SOURCE_NAME, SOURCE_TYPE, OPERATION, ACTION_TIMESTAMP" + + ") VALUES (" + + "?, " + + "?, " + + "?, " + + "?, " + + "?, " + + "?, " + + "?, " + + ")"; + + // ----------------- + // component details + // ----------------- + private static final String INSERT_EXTENSION_DETAILS = "INSERT INTO PROCESSOR_DETAILS (" + + "ACTION_ID, TYPE" + + ") VALUES (" + + "?, " + + "?" + + ")"; + + private static final String INSERT_REMOTE_PROCESS_GROUP_DETAILS = "INSERT INTO REMOTE_PROCESS_GROUP_DETAILS (" + + "ACTION_ID, URI" + + ") VALUES (" + + "?, " + + "?" + + ")"; + + // -------------- + // action details + // -------------- + private static final String INSERT_CONFIGURE_DETAILS = "INSERT INTO CONFIGURE_DETAILS (" + + "ACTION_ID, NAME, VALUE, PREVIOUS_VALUE" + + ") VALUES (" + + "?, " + + "?, " + + "?, " + + "?" + + ")"; + + private static final String INSERT_CONNECT_DETAILS = "INSERT INTO CONNECT_DETAILS (" + + "ACTION_ID, SOURCE_ID, SOURCE_NAME, SOURCE_TYPE, RELATIONSHIP, DESTINATION_ID, DESTINATION_NAME, DESTINATION_TYPE" + + ") VALUES (" + + "?, " + + "?, " + + "?, " + + "?, " + + "?, " + + "?, " + + "?, " + + "?" + + ")"; + + private static final String INSERT_MOVE_DETAILS = "INSERT INTO MOVE_DETAILS (" + + "ACTION_ID, GROUP_ID, GROUP_NAME, PREVIOUS_GROUP_ID, PREVIOUS_GROUP_NAME" + + ") VALUES (" + + "?, " + + "?, " + + "?, " + + "?, " + + "?" + + ")"; + + private static final String INSERT_PURGE_DETAILS = "INSERT INTO PURGE_DETAILS (" + + "ACTION_ID, END_DATE" + + ") VALUES (" + + "?, " + + "?" + + ")"; + + // ------------ + // action table + // ------------ + private static final String SELECT_ACTIONS = "SELECT * FROM ACTION"; + + private static final String SELECT_ACTION_COUNT = "SELECT COUNT(*) AS ACTION_COUNT FROM ACTION"; + + private static final String SELECT_ACTION_BY_ID = "SELECT * " + + "FROM ACTION " + + "WHERE " + + "ID = ?"; + + private static final String DELETE_ACTIONS = "DELETE FROM ACTION WHERE ACTION_TIMESTAMP < ?"; + + private static final String DELETE_SPECIFIC_ACTIONS = "DELETE FROM %s WHERE %s IN (SELECT ID FROM ACTION WHERE ACTION_TIMESTAMP < ?)"; + + // ----------------- + // component details + // ----------------- + private static final String SELECT_EXTENSION_DETAILS_FOR_ACTION = "SELECT * FROM PROCESSOR_DETAILS WHERE ACTION_ID = ?"; + + private static final String SELECT_REMOTE_PROCESS_GROUP_DETAILS_FOR_ACTION = "SELECT * FROM REMOTE_PROCESS_GROUP_DETAILS WHERE ACTION_ID = ?"; + + // -------------- + // action details + // -------------- + private static final String SELECT_MOVE_DETAILS_FOR_ACTION = "SELECT * FROM MOVE_DETAILS WHERE ACTION_ID = ?"; + + private static final String SELECT_CONFIGURE_DETAILS_FOR_ACTION = "SELECT * FROM CONFIGURE_DETAILS WHERE ACTION_ID = ?"; + + private static final String SELECT_CONNECT_DETAILS_FOR_ACTION = "SELECT * FROM CONNECT_DETAILS WHERE ACTION_ID = ?"; + + private static final String SELECT_PURGE_DETAILS_FOR_ACTION = "SELECT * FROM PURGE_DETAILS WHERE ACTION_ID = ?"; + + // --------------- + // previous values + // --------------- + private static final String SELECT_PREVIOUSLY_CONFIGURED_FIELDS = "SELECT DISTINCT CD.NAME " + + "FROM CONFIGURE_DETAILS CD " + + "INNER JOIN ACTION A " + + "ON CD.ACTION_ID = A.ID " + + "WHERE A.SOURCE_ID = ?"; + + private static final String SELECT_PREVIOUS_VALUES = "SELECT CD.VALUE, " + + "A.ACTION_TIMESTAMP, " + + "A.USER_NAME " + + "FROM CONFIGURE_DETAILS CD " + + "INNER JOIN ACTION A " + + "ON CD.ACTION_ID = A.ID " + + "WHERE A.SOURCE_ID = ? AND CD.NAME = ? " + + "ORDER BY A.ACTION_TIMESTAMP DESC " + + "LIMIT 4"; + + private final Connection connection; + private final Map<String, String> columnMap; + + public StandardActionDAO(Connection connection) { + this.connection = connection; + + // initialize the column mappings + this.columnMap = new HashMap<>(); + this.columnMap.put("timestamp", "ACTION_TIMESTAMP"); + this.columnMap.put("sourceName", "SOURCE_NAME"); + this.columnMap.put("sourceType", "SOURCE_TYPE"); + this.columnMap.put("operation", "OPERATION"); + this.columnMap.put("userName", "USER_NAME"); + } + + @Override + public void createAction(Action action) throws DataAccessException { + if (action.getUserDn() == null) { + throw new IllegalArgumentException("User cannot be null."); + } + + if (action.getTimestamp() == null) { + throw new IllegalArgumentException("Action timestamp cannot be null."); + } + + PreparedStatement statement = null; + ResultSet rs = null; + try { + // obtain a statement to insert to the action table + statement = connection.prepareStatement(INSERT_ACTION, Statement.RETURN_GENERATED_KEYS); + statement.setString(1, StringUtils.left(action.getUserDn(), 255)); + statement.setString(2, StringUtils.left(action.getUserName(), 100)); + statement.setString(3, action.getSourceId()); + statement.setString(4, StringUtils.left(action.getSourceName(), 1000)); + statement.setString(5, action.getSourceType().toString()); + statement.setString(6, action.getOperation().toString()); + statement.setTimestamp(7, new java.sql.Timestamp(action.getTimestamp().getTime())); + + // insert the action + int updateCount = statement.executeUpdate(); + + // get the action id + rs = statement.getGeneratedKeys(); + if (updateCount == 1 && rs.next()) { + action.setId(rs.getInt(1)); + } else { + throw new DataAccessException("Unable to insert action."); + } + + // close the previous statement + statement.close(); + + // determine the type of component + ComponentDetails componentDetails = action.getComponentDetails(); + if (componentDetails instanceof ExtensionDetails) { + createExtensionDetails(action.getId(), (ExtensionDetails) componentDetails); + } else if (componentDetails instanceof RemoteProcessGroupDetails) { + createRemoteProcessGroupDetails(action.getId(), (RemoteProcessGroupDetails) componentDetails); + } + + // determine the type of action + ActionDetails details = action.getActionDetails(); + if (details instanceof ConnectDetails) { + createConnectDetails(action.getId(), (ConnectDetails) details); + } else if (details instanceof MoveDetails) { + createMoveDetails(action.getId(), (MoveDetails) details); + } else if (details instanceof ConfigureDetails) { + createConfigureDetails(action.getId(), (ConfigureDetails) details); + } else if (details instanceof PurgeDetails) { + createPurgeDetails(action.getId(), (PurgeDetails) details); + } + + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + } + + private void createExtensionDetails(int actionId, ExtensionDetails extensionDetails) throws DataAccessException { + PreparedStatement statement = null; + try { + // obtain a statement to insert to the extension action table + statement = connection.prepareStatement(INSERT_EXTENSION_DETAILS); + statement.setInt(1, actionId); + statement.setString(2, StringUtils.left(extensionDetails.getType(), 1000)); + + // insert the action + int updateCount = statement.executeUpdate(); + + // ensure the operation completed successfully + if (updateCount != 1) { + throw new DataAccessException("Unable to insert extension details."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + private void createRemoteProcessGroupDetails(int actionId, RemoteProcessGroupDetails remoteProcessGroupDetails) throws DataAccessException { + PreparedStatement statement = null; + try { + // obtain a statement to insert to the processor action table + statement = connection.prepareStatement(INSERT_REMOTE_PROCESS_GROUP_DETAILS); + statement.setInt(1, actionId); + statement.setString(2, StringUtils.left(remoteProcessGroupDetails.getUri(), 2500)); + + // insert the action + int updateCount = statement.executeUpdate(); + + // ensure the operation completed successfully + if (updateCount != 1) { + throw new DataAccessException("Unable to insert remote prcoess group details."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + private void createConnectDetails(int actionId, ConnectDetails connectionDetails) throws DataAccessException { + PreparedStatement statement = null; + try { + // obtain a statement to insert to the processor action table + statement = connection.prepareStatement(INSERT_CONNECT_DETAILS); + statement.setInt(1, actionId); + statement.setString(2, connectionDetails.getSourceId()); + statement.setString(3, StringUtils.left(connectionDetails.getSourceName(), 1000)); + statement.setString(4, StringUtils.left(connectionDetails.getSourceType().toString(), 1000)); + statement.setString(5, StringUtils.left(connectionDetails.getRelationship(), 1000)); + statement.setString(6, connectionDetails.getDestinationId()); + statement.setString(7, StringUtils.left(connectionDetails.getDestinationName(), 1000)); + statement.setString(8, StringUtils.left(connectionDetails.getDestinationType().toString(), 1000)); + + // insert the action + int updateCount = statement.executeUpdate(); + + // ensure the operation completed successfully + if (updateCount != 1) { + throw new DataAccessException("Unable to insert connection details."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + private void createMoveDetails(int actionId, MoveDetails moveDetails) throws DataAccessException { + PreparedStatement statement = null; + try { + // obtain a statement to insert to the processor action table + statement = connection.prepareStatement(INSERT_MOVE_DETAILS); + statement.setInt(1, actionId); + statement.setString(2, moveDetails.getGroupId()); + statement.setString(3, StringUtils.left(moveDetails.getGroup(), 1000)); + statement.setString(4, moveDetails.getPreviousGroupId()); + statement.setString(5, StringUtils.left(moveDetails.getPreviousGroup(), 1000)); + + // insert the action + int updateCount = statement.executeUpdate(); + + // ensure the operation completed successfully + if (updateCount != 1) { + throw new DataAccessException("Unable to insert move details."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + private void createConfigureDetails(int actionId, ConfigureDetails configurationDetails) throws DataAccessException { + PreparedStatement statement = null; + try { + // obtain a statement to insert to the processor action table + statement = connection.prepareStatement(INSERT_CONFIGURE_DETAILS); + statement.setInt(1, actionId); + statement.setString(2, StringUtils.left(configurationDetails.getName(), 1000)); + statement.setString(3, StringUtils.left(configurationDetails.getValue(), 5000)); + statement.setString(4, StringUtils.left(configurationDetails.getPreviousValue(), 5000)); + + // insert the action + int updateCount = statement.executeUpdate(); + + // ensure the operation completed successfully + if (updateCount != 1) { + throw new DataAccessException("Unable to insert configure details."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + private void createPurgeDetails(int actionId, PurgeDetails purgeDetails) throws DataAccessException { + PreparedStatement statement = null; + try { + // obtain a statement to insert to the processor action table + statement = connection.prepareStatement(INSERT_PURGE_DETAILS); + statement.setInt(1, actionId); + statement.setTimestamp(2, new java.sql.Timestamp(purgeDetails.getEndDate().getTime())); + + // insert the action + int updateCount = statement.executeUpdate(); + + // ensure the operation completed successfully + if (updateCount != 1) { + throw new DataAccessException("Unable to insert connection details."); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public History findActions(HistoryQuery historyQuery) throws DataAccessException { + + // get the sort column + String sortColumn = "ACTION_TIMESTAMP"; + if (StringUtils.isNotBlank(historyQuery.getSortColumn())) { + String rawColumnName = historyQuery.getSortColumn(); + if (!columnMap.containsKey(rawColumnName)) { + throw new IllegalArgumentException(String.format("Unrecognized column name '%s'.", rawColumnName)); + } + sortColumn = columnMap.get(rawColumnName); + } + + // get the sort order + String sortOrder = "desc"; + if (StringUtils.isNotBlank(historyQuery.getSortOrder())) { + sortOrder = historyQuery.getSortOrder(); + } + + History actionResult = new History(); + Collection<Action> actions = new ArrayList<>(); + PreparedStatement statement = null; + ResultSet rs = null; + try { + List<String> where = new ArrayList<>(); + + // append the start time + if (historyQuery.getStartDate() != null) { + where.add("ACTION_TIMESTAMP >= ?"); + } + + // append the end time + if (historyQuery.getEndDate() != null) { + where.add("ACTION_TIMESTAMP <= ?"); + } + + // append the user id as necessary + if (historyQuery.getUserName() != null) { + where.add("UPPER(USER_NAME) LIKE ?"); + } + + // append the source id as necessary + if (historyQuery.getSourceId() != null) { + where.add("SOURCE_ID = ?"); + } + + String sql = SELECT_ACTION_COUNT; + if (!where.isEmpty()) { + sql += " WHERE " + StringUtils.join(where, " AND "); + } + + // get the total number of actions + statement = connection.prepareStatement(sql); + int paramIndex = 1; + + // set the start date as necessary + if (historyQuery.getStartDate() != null) { + statement.setTimestamp(paramIndex++, new java.sql.Timestamp(historyQuery.getStartDate().getTime())); + } + + // set the end date as necessary + if (historyQuery.getEndDate() != null) { + statement.setTimestamp(paramIndex++, new java.sql.Timestamp(historyQuery.getEndDate().getTime())); + } + + // set the user id as necessary + if (historyQuery.getUserName() != null) { + statement.setString(paramIndex++, "%" + historyQuery.getUserName().toUpperCase() + "%"); + } + + // set the source id as necessary + if (historyQuery.getSourceId() != null) { + statement.setString(paramIndex, historyQuery.getSourceId()); + } + + // execute the statement + rs = statement.executeQuery(); + + // ensure there are results + if (rs.next()) { + actionResult.setTotal(rs.getInt("ACTION_COUNT")); + } else { + throw new DataAccessException("Unable to determine total action count."); + } + + sql = SELECT_ACTIONS; + if (!where.isEmpty()) { + sql += " WHERE " + StringUtils.join(where, " AND "); + } + + // append the sort criteria + sql += (" ORDER BY " + sortColumn + " " + sortOrder); + + // append the offset and limit + sql += " LIMIT ? OFFSET ?"; + + // close the previous statement + statement.close(); + + // create the statement + statement = connection.prepareStatement(sql); + paramIndex = 1; + + // set the start date as necessary + if (historyQuery.getStartDate() != null) { + statement.setTimestamp(paramIndex++, new java.sql.Timestamp(historyQuery.getStartDate().getTime())); + } + + // set the end date as necessary + if (historyQuery.getEndDate() != null) { + statement.setTimestamp(paramIndex++, new java.sql.Timestamp(historyQuery.getEndDate().getTime())); + } + + // set the user id as necessary + if (historyQuery.getUserName() != null) { + statement.setString(paramIndex++, "%" + historyQuery.getUserName().toUpperCase() + "%"); + } + + // set the source id as necessary + if (historyQuery.getSourceId() != null) { + statement.setString(paramIndex++, historyQuery.getSourceId()); + } + + // set the limit + statement.setInt(paramIndex++, historyQuery.getCount()); + + // set the offset according to the currented page calculated above + statement.setInt(paramIndex, historyQuery.getOffset()); + + // execute the query + rs = statement.executeQuery(); + + // create each corresponding action + while (rs.next()) { + final Integer actionId = rs.getInt("ID"); + final Operation operation = Operation.valueOf(rs.getString("OPERATION")); + final Component component = Component.valueOf(rs.getString("SOURCE_TYPE")); + + Action action = new Action(); + action.setId(actionId); + action.setUserDn(rs.getString("USER_DN")); + action.setUserName(rs.getString("USER_NAME")); + action.setOperation(Operation.valueOf(rs.getString("OPERATION"))); + action.setTimestamp(new Date(rs.getTimestamp("ACTION_TIMESTAMP").getTime())); + action.setSourceId(rs.getString("SOURCE_ID")); + action.setSourceName(rs.getString("SOURCE_NAME")); + action.setSourceType(Component.valueOf(rs.getString("SOURCE_TYPE"))); + + // get the component details if appropriate + ComponentDetails componentDetails = null; + if (Component.Processor.equals(component) || Component.ControllerService.equals(component) || Component.ReportingTask.equals(component)) { + componentDetails = getExtensionDetails(actionId); + } else if (Component.RemoteProcessGroup.equals(component)) { + componentDetails = getRemoteProcessGroupDetails(actionId); + } + + if (componentDetails != null) { + action.setComponentDetails(componentDetails); + } + + // get the action details if appropriate + ActionDetails actionDetails = null; + if (Operation.Move.equals(operation)) { + actionDetails = getMoveDetails(actionId); + } else if (Operation.Configure.equals(operation)) { + actionDetails = getConfigureDetails(actionId); + } else if (Operation.Connect.equals(operation) || Operation.Disconnect.equals(operation)) { + actionDetails = getConnectDetails(actionId); + } else if (Operation.Purge.equals(operation)) { + actionDetails = getPurgeDetails(actionId); + } + + // set the action details + if (actionDetails != null) { + action.setActionDetails(actionDetails); + } + + // add the action + actions.add(action); + } + + // populate the action result + actionResult.setActions(actions); + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return actionResult; + } + + @Override + public Action getAction(Integer actionId) throws DataAccessException { + Action action = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_ACTION_BY_ID); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + Operation operation = Operation.valueOf(rs.getString("OPERATION")); + Component component = Component.valueOf(rs.getString("SOURCE_TYPE")); + + // populate the action + action = new Action(); + action.setId(rs.getInt("ID")); + action.setUserDn(rs.getString("USER_DN")); + action.setUserName(rs.getString("USER_NAME")); + action.setOperation(operation); + action.setTimestamp(new Date(rs.getTimestamp("ACTION_TIMESTAMP").getTime())); + action.setSourceId(rs.getString("SOURCE_ID")); + action.setSourceName(rs.getString("SOURCE_NAME")); + action.setSourceType(component); + + // get the component details if appropriate + ComponentDetails componentDetails = null; + if (Component.Processor.equals(component) || Component.ControllerService.equals(component) || Component.ReportingTask.equals(component)) { + componentDetails = getExtensionDetails(actionId); + } else if (Component.RemoteProcessGroup.equals(component)) { + componentDetails = getRemoteProcessGroupDetails(actionId); + } + + if (componentDetails != null) { + action.setComponentDetails(componentDetails); + } + + // get the action details if appropriate + ActionDetails actionDetails = null; + if (Operation.Move.equals(operation)) { + actionDetails = getMoveDetails(actionId); + } else if (Operation.Configure.equals(operation)) { + actionDetails = getConfigureDetails(actionId); + } else if (Operation.Connect.equals(operation) || Operation.Disconnect.equals(operation)) { + actionDetails = getConnectDetails(actionId); + } else if (Operation.Purge.equals(operation)) { + actionDetails = getPurgeDetails(actionId); + } + + // set the action details + if (actionDetails != null) { + action.setActionDetails(actionDetails); + } + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return action; + } + + private ExtensionDetails getExtensionDetails(Integer actionId) throws DataAccessException { + ExtensionDetails extensionDetails = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_EXTENSION_DETAILS_FOR_ACTION); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + extensionDetails = new ExtensionDetails(); + extensionDetails.setType(rs.getString("TYPE")); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return extensionDetails; + } + + private RemoteProcessGroupDetails getRemoteProcessGroupDetails(Integer actionId) throws DataAccessException { + RemoteProcessGroupDetails remoteProcessGroupDetails = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_REMOTE_PROCESS_GROUP_DETAILS_FOR_ACTION); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + remoteProcessGroupDetails = new RemoteProcessGroupDetails(); + remoteProcessGroupDetails.setUri(rs.getString("URI")); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return remoteProcessGroupDetails; + } + + private MoveDetails getMoveDetails(Integer actionId) throws DataAccessException { + MoveDetails moveDetails = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_MOVE_DETAILS_FOR_ACTION); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + moveDetails = new MoveDetails(); + moveDetails.setGroupId(rs.getString("GROUP_ID")); + moveDetails.setGroup(rs.getString("GROUP_NAME")); + moveDetails.setPreviousGroupId(rs.getString("PREVIOUS_GROUP_ID")); + moveDetails.setPreviousGroup(rs.getString("PREVIOUS_GROUP_NAME")); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return moveDetails; + } + + private ConnectDetails getConnectDetails(Integer actionId) throws DataAccessException { + ConnectDetails connectionDetails = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_CONNECT_DETAILS_FOR_ACTION); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + final Component sourceComponent = Component.valueOf(rs.getString("SOURCE_TYPE")); + final Component destinationComponent = Component.valueOf(rs.getString("DESTINATION_TYPE")); + + connectionDetails = new ConnectDetails(); + connectionDetails.setSourceId(rs.getString("SOURCE_ID")); + connectionDetails.setSourceName(rs.getString("SOURCE_NAME")); + connectionDetails.setSourceType(sourceComponent); + connectionDetails.setRelationship(rs.getString("RELATIONSHIP")); + connectionDetails.setDestinationId(rs.getString("DESTINATION_ID")); + connectionDetails.setDestinationName(rs.getString("DESTINATION_NAME")); + connectionDetails.setDestinationType(destinationComponent); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return connectionDetails; + } + + private ConfigureDetails getConfigureDetails(Integer actionId) throws DataAccessException { + ConfigureDetails configurationDetails = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_CONFIGURE_DETAILS_FOR_ACTION); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + configurationDetails = new ConfigureDetails(); + configurationDetails.setName(rs.getString("NAME")); + configurationDetails.setValue(rs.getString("VALUE")); + configurationDetails.setPreviousValue(rs.getString("PREVIOUS_VALUE")); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return configurationDetails; + } + + private PurgeDetails getPurgeDetails(Integer actionId) throws DataAccessException { + PurgeDetails purgeDetails = null; + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_PURGE_DETAILS_FOR_ACTION); + statement.setInt(1, actionId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + if (rs.next()) { + purgeDetails = new PurgeDetails(); + purgeDetails.setEndDate(new Date(rs.getTimestamp("END_DATE").getTime())); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return purgeDetails; + } + + @Override + public Map<String, List<PreviousValue>> getPreviousValues(String componentId) { + Map<String, List<PreviousValue>> previousValues = new LinkedHashMap<>(); + + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_PREVIOUSLY_CONFIGURED_FIELDS); + statement.setString(1, componentId); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + while (rs.next()) { + final String property = rs.getString("NAME"); + previousValues.put(property, getPreviousValuesForProperty(componentId, property)); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return previousValues; + } + + private List<PreviousValue> getPreviousValuesForProperty(final String componentId, final String property) { + List<PreviousValue> previousValues = new ArrayList<>(); + + PreparedStatement statement = null; + ResultSet rs = null; + try { + // create the statement + statement = connection.prepareStatement(SELECT_PREVIOUS_VALUES); + statement.setString(1, componentId); + statement.setString(2, property); + + // execute the query + rs = statement.executeQuery(); + + // ensure results + while (rs.next()) { + // get the previous value + final PreviousValue previousValue = new PreviousValue(); + previousValue.setPreviousValue(rs.getString("VALUE")); + previousValue.setTimestamp(new Date(rs.getTimestamp("ACTION_TIMESTAMP").getTime())); + previousValue.setUserName(rs.getString("USER_NAME")); + previousValues.add(previousValue); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return previousValues; + } + + @Override + public void deleteActions(Date endDate) throws DataAccessException { + PreparedStatement statement = null; + try { + // ----------------- + // component details + // ----------------- + + // create the move delete statement + statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "PROCESSOR_DETAILS", "ACTION_ID")); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + statement.close(); + + // create the move delete statement + statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "REMOTE_PROCESS_GROUP_DETAILS", "ACTION_ID")); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + statement.close(); + + // -------------- + // action details + // -------------- + // create the move delete statement + statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "MOVE_DETAILS", "ACTION_ID")); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + statement.close(); + + // create the configure delete statement + statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "CONFIGURE_DETAILS", "ACTION_ID")); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + statement.close(); + + // create the connect delete statement + statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "CONNECT_DETAILS", "ACTION_ID")); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + statement.close(); + + // create the relationship delete statement + statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "PURGE_DETAILS", "ACTION_ID")); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + statement.close(); + + // ------- + // actions + // ------- + // create the action delete statement + statement = connection.prepareStatement(DELETE_ACTIONS); + statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime())); + statement.executeUpdate(); + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardAuthorityDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardAuthorityDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardAuthorityDAO.java new file mode 100644 index 0000000..4e2cc26 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/dao/impl/StandardAuthorityDAO.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.admin.dao.impl; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.EnumSet; +import java.util.Set; +import org.apache.nifi.admin.RepositoryUtils; +import org.apache.nifi.admin.dao.AuthorityDAO; +import org.apache.nifi.admin.dao.DataAccessException; +import org.apache.nifi.authorization.Authority; + +/** + * + */ +public class StandardAuthorityDAO implements AuthorityDAO { + + private static final String SELECT_AUTHORITIES_FOR_USER = "SELECT ID, ROLE " + + "FROM AUTHORITY " + + "WHERE USER_ID = ?"; + + private static final String INSERT_AUTHORITY = "INSERT INTO AUTHORITY (" + + "USER_ID, ROLE" + + ") VALUES (" + + "?, ?" + + ")"; + + private static final String DELETE_AUTHORITY = "DELETE FROM AUTHORITY " + + "WHERE USER_ID = ? AND ROLE = ?"; + + private static final String DELETE_AUTHORITIES_FOR_USER = "DELETE FROM AUTHORITY " + + "WHERE USER_ID = ?"; + + private final Connection connection; + + public StandardAuthorityDAO(Connection connection) { + this.connection = connection; + } + + @Override + public void createAuthorities(Set<Authority> authorities, String userId) throws DataAccessException { + if (authorities == null) { + throw new IllegalArgumentException("Specified authorities cannot be null."); + } + + // ensure there are some authorities to create + if (!authorities.isEmpty()) { + PreparedStatement statement = null; + try { + // add each authority for the specified user + statement = connection.prepareStatement(INSERT_AUTHORITY); + statement.setString(1, userId); + for (Authority authority : authorities) { + statement.setString(2, authority.toString()); + statement.addBatch(); + } + + // insert the authorities + int[] updateCounts = statement.executeBatch(); + for (int updateCount : updateCounts) { + if (updateCount != 1) { + throw new DataAccessException("Unable to insert user authorities."); + } + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } catch (DataAccessException dae) { + throw dae; + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + } + + @Override + public void deleteAuthorities(String userId) throws DataAccessException { + // ensure there are some authorities to create + PreparedStatement statement = null; + try { + // add each authority for the specified user + statement = connection.prepareStatement(DELETE_AUTHORITIES_FOR_USER); + statement.setString(1, userId); + + // insert the authorities + statement.executeUpdate(); + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + + @Override + public void deleteAuthorities(Set<Authority> authorities, String userId) throws DataAccessException { + if (authorities == null) { + throw new IllegalArgumentException("Specified authorities cannot be null."); + } + + // ensure there are some authorities to create + if (!authorities.isEmpty()) { + PreparedStatement statement = null; + try { + // add each authority for the specified user + statement = connection.prepareStatement(DELETE_AUTHORITY); + statement.setString(1, userId); + for (Authority authority : authorities) { + statement.setString(2, authority.toString()); + statement.addBatch(); + } + + // insert the authorities + int[] updateCounts = statement.executeBatch(); + for (int updateCount : updateCounts) { + if (updateCount != 1) { + throw new DataAccessException("Unable to remove user authorities."); + } + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } catch (DataAccessException dae) { + throw dae; + } finally { + RepositoryUtils.closeQuietly(statement); + } + } + } + + @Override + public Set<Authority> findAuthoritiesByUserId(String userId) throws DataAccessException { + Set<Authority> authorities = EnumSet.noneOf(Authority.class); + PreparedStatement statement = null; + ResultSet rs = null; + try { + // add each authority for the specified user + statement = connection.prepareStatement(SELECT_AUTHORITIES_FOR_USER); + statement.setString(1, userId); + + // execute the query + rs = statement.executeQuery(); + + // create each corresponding authority + while (rs.next()) { + authorities.add(Authority.valueOfAuthority(rs.getString("ROLE"))); + } + } catch (SQLException sqle) { + throw new DataAccessException(sqle); + } finally { + RepositoryUtils.closeQuietly(rs); + RepositoryUtils.closeQuietly(statement); + } + + return authorities; + } + +}