http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/src/main/java/org/apache/nifi/authorization/FileAuthorizationProvider.java ---------------------------------------------------------------------- diff --git a/extensions/file-authorization-provider/src/main/java/org/apache/nifi/authorization/FileAuthorizationProvider.java b/extensions/file-authorization-provider/src/main/java/org/apache/nifi/authorization/FileAuthorizationProvider.java new file mode 100644 index 0000000..55f59ba --- /dev/null +++ b/extensions/file-authorization-provider/src/main/java/org/apache/nifi/authorization/FileAuthorizationProvider.java @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.authorization; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Set; +import javax.xml.XMLConstants; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; +import javax.xml.transform.stream.StreamSource; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; +import org.apache.nifi.authorization.annotation.AuthorityProviderContext; +import org.apache.nifi.authorization.exception.AuthorityAccessException; +import org.apache.nifi.authorization.exception.IdentityAlreadyExistsException; +import org.apache.nifi.authorization.exception.ProviderCreationException; +import org.apache.nifi.authorization.exception.UnknownIdentityException; +import org.apache.nifi.file.FileUtils; +import org.apache.nifi.user.generated.ObjectFactory; +import org.apache.nifi.user.generated.Role; +import org.apache.nifi.user.generated.User; +import org.apache.nifi.user.generated.Users; +import org.apache.nifi.util.NiFiProperties; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; + +/** + * Provides identity checks and grants authorities. + */ +public class FileAuthorizationProvider implements AuthorityProvider { + + private static final Logger logger = LoggerFactory.getLogger(FileAuthorizationProvider.class); + private static final String USERS_XSD = "/users.xsd"; + private static final String JAXB_GENERATED_PATH = "org.apache.nifi.user.generated"; + private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext(); + + /** + * Load the JAXBContext. + */ + private static JAXBContext initializeJaxbContext() { + try { + return JAXBContext.newInstance(JAXB_GENERATED_PATH, FileAuthorizationProvider.class.getClassLoader()); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXBContext."); + } + } + + private NiFiProperties properties; + private File usersFile; + private File restoreUsersFile; + private Users users; + private final Set<String> defaultAuthorities = new HashSet<>(); + + @Override + public void initialize(final AuthorityProviderInitializationContext initializationContext) throws ProviderCreationException { + } + + @Override + public void onConfigured(final AuthorityProviderConfigurationContext configurationContext) throws ProviderCreationException { + try { + final String usersFilePath = configurationContext.getProperty("Authorized Users File"); + if (usersFilePath == null || usersFilePath.trim().isEmpty()) { + throw new ProviderCreationException("The authorized users file must be specified."); + } + + // the users file instance will never be null because a default is used + usersFile = new File(usersFilePath); + final File usersFileDirectory = usersFile.getParentFile(); + + // the restore directory is optional and may be null + final File restoreDirectory = properties.getRestoreDirectory(); + + if (restoreDirectory != null) { + + // sanity check that restore directory is a directory, creating it if necessary + FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory); + + // check that restore directory is not the same as the primary directory + if (usersFileDirectory.getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) { + throw new ProviderCreationException(String.format("Authorized User's directory '%s' is the same as restore directory '%s' ", + usersFileDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath())); + } + + // the restore copy will have same file name, but reside in a different directory + restoreUsersFile = new File(restoreDirectory, usersFile.getName()); + + // sync the primary copy with the restore copy + try { + FileUtils.syncWithRestore(usersFile, restoreUsersFile, logger); + } catch (final IOException | IllegalStateException ioe) { + throw new ProviderCreationException(ioe); + } + + } + + // load the users from the specified file + if (usersFile.exists()) { + // find the schema + final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + final Schema schema = schemaFactory.newSchema(FileAuthorizationProvider.class.getResource(USERS_XSD)); + + // attempt to unmarshal + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); + unmarshaller.setSchema(schema); + final JAXBElement<Users> element = unmarshaller.unmarshal(new StreamSource(usersFile), Users.class); + users = element.getValue(); + } else { + final ObjectFactory objFactory = new ObjectFactory(); + users = objFactory.createUsers(); + } + + // attempt to load a default roles + final String rawDefaultAuthorities = configurationContext.getProperty("Default User Roles"); + if (StringUtils.isNotBlank(rawDefaultAuthorities)) { + final Set<String> invalidDefaultAuthorities = new HashSet<>(); + + // validate the specified authorities + final String[] rawDefaultAuthorityList = rawDefaultAuthorities.split(","); + for (String rawAuthority : rawDefaultAuthorityList) { + rawAuthority = rawAuthority.trim(); + final Authority authority = Authority.valueOfAuthority(rawAuthority); + if (authority == null) { + invalidDefaultAuthorities.add(rawAuthority); + } else { + defaultAuthorities.add(rawAuthority); + } + } + + // report any unrecognized authorities + if (!invalidDefaultAuthorities.isEmpty()) { + logger.warn(String.format("The following default role(s) '%s' were not recognized. Possible values: %s.", + StringUtils.join(invalidDefaultAuthorities, ", "), StringUtils.join(Authority.getRawAuthorities(), ", "))); + } + } + } catch (IOException | ProviderCreationException | SAXException | JAXBException e) { + throw new ProviderCreationException(e); + } + + } + + @Override + public void preDestruction() { + } + + /** + * Determines if this provider has a default role. + * + * @return + */ + private boolean hasDefaultRoles() { + return !defaultAuthorities.isEmpty(); + } + + /** + * Determines if the specified dn is known to this authority provider. When + * this provider is configured to have default role(s), all dn are + * considered to exist. + * + * @param dn + * @return True if he dn is known, false otherwise + */ + @Override + public boolean doesDnExist(String dn) throws AuthorityAccessException { + if (hasDefaultRoles()) { + return true; + } + + final User user = getUser(dn); + return user != null; + } + + /** + * Loads the authorities for the specified user. If this provider is + * configured for default user role(s) and a non existent dn is specified, a + * new user will be automatically created with the default role(s). + * + * @param dn + * @return + * @throws UnknownIdentityException + * @throws AuthorityAccessException + */ + @Override + public synchronized Set<Authority> getAuthorities(String dn) throws UnknownIdentityException, AuthorityAccessException { + final Set<Authority> authorities = EnumSet.noneOf(Authority.class); + + // get the user + final User user = getUser(dn); + + // ensure the user was located + if (user == null) { + if (hasDefaultRoles()) { + logger.debug(String.format("User DN not found: %s. Creating new user with default roles.", dn)); + + // create the user (which will automatically add any default authorities) + addUser(dn, null); + + // get the authorities for the newly created user + authorities.addAll(getAuthorities(dn)); + } else { + throw new UnknownIdentityException(String.format("User DN not found: %s.", dn)); + } + } else { + // create the authorities that this user has + for (final Role role : user.getRole()) { + authorities.add(Authority.valueOfAuthority(role.getName())); + } + } + + return authorities; + } + + /** + * Adds the specified authorities to the specified user. Regardless of + * whether this provider is configured for a default user role, when a non + * existent dn is specified, an UnknownIdentityException will be thrown. + * + * @param dn + * @param authorities + * @throws UnknownIdentityException + * @throws AuthorityAccessException + */ + @Override + public synchronized void setAuthorities(String dn, Set<Authority> authorities) throws UnknownIdentityException, AuthorityAccessException { + // get the user + final User user = getUser(dn); + + // ensure the user was located + if (user == null) { + throw new UnknownIdentityException(String.format("User DN not found: %s.", dn)); + } + + // add the user authorities + setUserAuthorities(user, authorities); + + try { + // save the file + save(); + } catch (Exception e) { + throw new AuthorityAccessException(e.getMessage(), e); + } + } + + /** + * Adds the specified authorities to the specified user. + * + * @param user + * @param authorities + */ + private void setUserAuthorities(final User user, final Set<Authority> authorities) { + // clear the existing rules + user.getRole().clear(); + + // set the new roles + final ObjectFactory objFactory = new ObjectFactory(); + for (final Authority authority : authorities) { + final Role role = objFactory.createRole(); + role.setName(authority.toString()); + + // add the new role + user.getRole().add(role); + } + } + + /** + * Adds the specified user. If this provider is configured with default + * role(s) they will be added to the new user. + * + * @param dn + * @param group + * @throws UnknownIdentityException + * @throws AuthorityAccessException + */ + @Override + public synchronized void addUser(String dn, String group) throws IdentityAlreadyExistsException, AuthorityAccessException { + final User user = getUser(dn); + + // ensure the user doesn't already exist + if (user != null) { + throw new IdentityAlreadyExistsException(String.format("User DN already exists: %s", dn)); + } + + // create the new user + final ObjectFactory objFactory = new ObjectFactory(); + final User newUser = objFactory.createUser(); + + // set the user properties + newUser.setDn(dn); + newUser.setGroup(group); + + // add default roles if appropriate + if (hasDefaultRoles()) { + for (final String authority : defaultAuthorities) { + Role role = objFactory.createRole(); + role.setName(authority); + + // add the role + newUser.getRole().add(role); + } + } + + // add the user + users.getUser().add(newUser); + + try { + // save the file + save(); + } catch (Exception e) { + throw new AuthorityAccessException(e.getMessage(), e); + } + } + + /** + * Gets the users for the specified authority. + * + * @param authority + * @return + * @throws AuthorityAccessException + */ + @Override + public synchronized Set<String> getUsers(Authority authority) throws AuthorityAccessException { + final Set<String> userSet = new HashSet<>(); + for (final User user : users.getUser()) { + for (final Role role : user.getRole()) { + if (role.getName().equals(authority.toString())) { + userSet.add(user.getDn()); + } + } + } + return userSet; + } + + /** + * Removes the specified user. Regardless of whether this provider is + * configured for a default user role, when a non existent dn is specified, + * an UnknownIdentityException will be thrown. + * + * @param dn + * @throws UnknownIdentityException + * @throws AuthorityAccessException + */ + @Override + public synchronized void revokeUser(String dn) throws UnknownIdentityException, AuthorityAccessException { + // get the user + final User user = getUser(dn); + + // ensure the user was located + if (user == null) { + throw new UnknownIdentityException(String.format("User DN not found: %s.", dn)); + } + + // remove the specified user + users.getUser().remove(user); + + try { + // save the file + save(); + } catch (Exception e) { + throw new AuthorityAccessException(e.getMessage(), e); + } + } + + @Override + public void setUsersGroup(Set<String> dns, String group) throws UnknownIdentityException, AuthorityAccessException { + final Collection<User> groupedUsers = new HashSet<>(); + + // get the specified users + for (final String dn : dns) { + // get the user + final User user = getUser(dn); + + // ensure the user was located + if (user == null) { + throw new UnknownIdentityException(String.format("User DN not found: %s.", dn)); + } + + groupedUsers.add(user); + } + + // update each user group + for (final User user : groupedUsers) { + user.setGroup(group); + } + + try { + // save the file + save(); + } catch (Exception e) { + throw new AuthorityAccessException(e.getMessage(), e); + } + } + + @Override + public void ungroupUser(String dn) throws UnknownIdentityException, AuthorityAccessException { + // get the user + final User user = getUser(dn); + + // ensure the user was located + if (user == null) { + throw new UnknownIdentityException(String.format("User DN not found: %s.", dn)); + } + + // remove the users group + user.setGroup(null); + + try { + // save the file + save(); + } catch (Exception e) { + throw new AuthorityAccessException(e.getMessage(), e); + } + } + + @Override + public void ungroup(String group) throws AuthorityAccessException { + // get the user group + final Collection<User> userGroup = getUserGroup(group); + + // ensure the user group was located + if (userGroup == null) { + return; + } + + // update each user group + for (final User user : userGroup) { + user.setGroup(null); + } + + try { + // save the file + save(); + } catch (Exception e) { + throw new AuthorityAccessException(e.getMessage(), e); + } + } + + @Override + public String getGroupForUser(String dn) throws UnknownIdentityException, AuthorityAccessException { + // get the user + final User user = getUser(dn); + + // ensure the user was located + if (user == null) { + throw new UnknownIdentityException(String.format("User DN not found: %s.", dn)); + } + + return user.getGroup(); + } + + @Override + public void revokeGroup(String group) throws UnknownIdentityException, AuthorityAccessException { + // get the user group + final Collection<User> userGroup = getUserGroup(group); + + // ensure the user group was located + if (userGroup == null) { + throw new UnknownIdentityException(String.format("User group not found: %s.", group)); + } + + // remove each user in the group + for (final User user : userGroup) { + users.getUser().remove(user); + } + + try { + // save the file + save(); + } catch (Exception e) { + throw new AuthorityAccessException(e.getMessage(), e); + } + } + + /** + * Locates the user with the specified DN. + * + * @param dn + * @return + */ + private User getUser(String dn) throws UnknownIdentityException { + // ensure the DN was specified + if (dn == null) { + throw new UnknownIdentityException("User DN not specified."); + } + + // attempt to get the user and ensure it was located + User desiredUser = null; + for (final User user : users.getUser()) { + if (dn.equalsIgnoreCase(user.getDn())) { + desiredUser = user; + break; + } + } + + return desiredUser; + } + + /** + * Locates all users that are part of the specified group. + * + * @param group + * @return + * @throws UnknownIdentityException + */ + private Collection<User> getUserGroup(String group) throws UnknownIdentityException { + // ensure the DN was specified + if (group == null) { + throw new UnknownIdentityException("User group not specified."); + } + + // get all users with this group + Collection<User> userGroup = null; + for (final User user : users.getUser()) { + if (group.equals(user.getGroup())) { + if (userGroup == null) { + userGroup = new HashSet<>(); + } + userGroup.add(user); + } + } + + return userGroup; + } + + /** + * Saves the users file. + * + * @throws Exception + */ + private void save() throws Exception { + final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); + marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE); + + // save users to restore directory before primary directory + if (restoreUsersFile != null) { + marshaller.marshal(users, restoreUsersFile); + } + + // save users to primary directory + marshaller.marshal(users, usersFile); + } + + @AuthorityProviderContext + public void setNiFiProperties(NiFiProperties properties) { + this.properties = properties; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider ---------------------------------------------------------------------- diff --git a/extensions/file-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider b/extensions/file-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider new file mode 100755 index 0000000..93d2941 --- /dev/null +++ b/extensions/file-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.authorization.FileAuthorizationProvider http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/src/main/xsd/users.xsd ---------------------------------------------------------------------- diff --git a/extensions/file-authorization-provider/src/main/xsd/users.xsd b/extensions/file-authorization-provider/src/main/xsd/users.xsd new file mode 100644 index 0000000..4ee1e17 --- /dev/null +++ b/extensions/file-authorization-provider/src/main/xsd/users.xsd @@ -0,0 +1,64 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> + <!-- role --> + <xs:complexType name="Role"> + <xs:attribute name="name"> + <xs:simpleType> + <xs:restriction base="xs:string"> + <xs:enumeration value="ROLE_MONITOR"/> + <xs:enumeration value="ROLE_PROVENANCE"/> + <xs:enumeration value="ROLE_DFM"/> + <xs:enumeration value="ROLE_ADMIN"/> + <xs:enumeration value="ROLE_PROXY"/> + <xs:enumeration value="ROLE_NIFI"/> + </xs:restriction> + </xs:simpleType> + </xs:attribute> + </xs:complexType> + + <!-- user --> + <xs:complexType name="User"> + <xs:sequence> + <xs:element name="role" type="Role" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + <xs:attribute name="dn"> + <xs:simpleType> + <xs:restriction base="xs:string"> + <xs:minLength value="1"/> + <xs:pattern value=".*[^\s].*"/> + </xs:restriction> + </xs:simpleType> + </xs:attribute> + <xs:attribute name="group"> + <xs:simpleType> + <xs:restriction base="xs:string"> + <xs:minLength value="1"/> + <xs:pattern value=".*[^\s].*"/> + </xs:restriction> + </xs:simpleType> + </xs:attribute> + </xs:complexType> + + <!-- users --> + <xs:element name="users"> + <xs:complexType> + <xs:sequence> + <xs:element name="user" type="User" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + </xs:element> +</xs:schema> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/src/test/java/org/apache/nifi/authorization/FileAuthorizationProviderTest.java ---------------------------------------------------------------------- diff --git a/extensions/file-authorization-provider/src/test/java/org/apache/nifi/authorization/FileAuthorizationProviderTest.java b/extensions/file-authorization-provider/src/test/java/org/apache/nifi/authorization/FileAuthorizationProviderTest.java new file mode 100644 index 0000000..3d0196d --- /dev/null +++ b/extensions/file-authorization-provider/src/test/java/org/apache/nifi/authorization/FileAuthorizationProviderTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.authorization; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import org.apache.nifi.authorization.exception.ProviderCreationException; +import org.apache.nifi.file.FileUtils; +import org.apache.nifi.util.NiFiProperties; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; +import org.junit.Ignore; +import org.mockito.Mockito; + +@Ignore +public class FileAuthorizationProviderTest { + + private FileAuthorizationProvider provider; + + private File primary; + + private File restore; + + private NiFiProperties mockProperties; + + private AuthorityProviderConfigurationContext mockConfigurationContext; + + @Before + public void setup() throws IOException { + + primary = new File("target/primary/users.txt"); + restore = new File("target/restore/users.txt"); + + System.out.println("absolute path: " + primary.getAbsolutePath()); + + mockProperties = mock(NiFiProperties.class); + when(mockProperties.getRestoreDirectory()).thenReturn(restore.getParentFile()); + + mockConfigurationContext = mock(AuthorityProviderConfigurationContext.class); + when(mockConfigurationContext.getProperty(Mockito.eq("Authorized Users File"))).thenReturn(primary.getPath()); + + provider = new FileAuthorizationProvider(); + provider.setNiFiProperties(mockProperties); + provider.initialize(null); + } + + @After + public void cleanup() throws Exception { + deleteFile(primary); + deleteFile(restore); + } + + private boolean deleteFile(final File file) { + if(file.isDirectory()) { + FileUtils.deleteFilesInDir(file, null, null, true, true); + } + return FileUtils.deleteFile(file, null, 10); + } + + @Test + public void testPostContructionWhenRestoreDoesNotExist() throws Exception { + + byte[] primaryBytes = "<users/>".getBytes(); + FileOutputStream fos = new FileOutputStream(primary); + fos.write(primaryBytes); + fos.close(); + + provider.onConfigured(mockConfigurationContext); + assertEquals(primary.length(), restore.length()); + } + + @Test + public void testPostContructionWhenPrimaryDoesNotExist() throws Exception { + + byte[] restoreBytes = "<users/>".getBytes(); + FileOutputStream fos = new FileOutputStream(restore); + fos.write(restoreBytes); + fos.close(); + + provider.onConfigured(mockConfigurationContext); + assertEquals(restore.length(), primary.length()); + + } + + @Test(expected = ProviderCreationException.class) + public void testPostContructionWhenPrimaryDifferentThanRestore() throws Exception { + + byte[] primaryBytes = "<users></users>".getBytes(); + FileOutputStream fos = new FileOutputStream(primary); + fos.write(primaryBytes); + fos.close(); + + byte[] restoreBytes = "<users/>".getBytes(); + fos = new FileOutputStream(restore); + fos.write(restoreBytes); + fos.close(); + + provider.onConfigured(mockConfigurationContext); + } + + @Test + public void testPostContructionWhenPrimaryAndBackupDoNotExist() throws Exception { + + provider.onConfigured(mockConfigurationContext); + assertEquals(0, restore.length()); + assertEquals(restore.length(), primary.length()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/misc/build-order.sh ---------------------------------------------------------------------- diff --git a/misc/build-order.sh b/misc/build-order.sh new file mode 100755 index 0000000..855321a --- /dev/null +++ b/misc/build-order.sh @@ -0,0 +1,79 @@ +#MAVEN_FLAGS="-Dmaven.test.skip=true" +MAVEN_FLAGS="" + +cd misc/nar-maven-plugin && \ +mvn $MAVEN_FLAGS install && \ +cd ../../commons/nifi-parent && \ +mvn $MAVEN_FLAGS install && \ +cd ../../nifi-api && \ +mvn $MAVEN_FLAGS install && \ +cd ../commons/ && \ +cd nifi-stream-utils && \ +mvn $MAVEN_FLAGS install && \ +cd ../wali && \ +mvn $MAVEN_FLAGS install && \ +cd ../flowfile-packager && \ +mvn $MAVEN_FLAGS install && \ +cd ../core-flowfile-attributes && \ +mvn $MAVEN_FLAGS install && \ +cd ../data-provenance-utils && \ +mvn $MAVEN_FLAGS install && \ +cd ../naive-search-ring-buffer && \ +mvn $MAVEN_FLAGS install && \ +cd ../nifi-expression-language && \ +mvn $MAVEN_FLAGS install && \ +cd ../nifi-file-utils && \ +mvn $MAVEN_FLAGS install && \ +cd ../nifi-logging-utils && \ +mvn $MAVEN_FLAGS install && \ +cd ../nifi-properties && \ +mvn $MAVEN_FLAGS install && \ +cd ../nifi-security-utils && \ +mvn $MAVEN_FLAGS install && \ +cd ../nifi-utils && \ +mvn $MAVEN_FLAGS install && \ +cd ../nifi-socket-utils && \ +mvn $MAVEN_FLAGS install && \ +cd ../nifi-web-utils && \ +mvn $MAVEN_FLAGS install && \ +cd ../processor-utilities && \ +mvn $MAVEN_FLAGS install && \ +cd ../remote-communications-utils && \ +mvn $MAVEN_FLAGS install && \ +cd ../search-utils && \ +mvn $MAVEN_FLAGS install && \ +cd ../../extensions/file-authorization-provider && \ +mvn $MAVEN_FLAGS install && \ +cd ../../nifi-mock && \ +mvn $MAVEN_FLAGS install && \ +cd ../nar-bundles/ && \ +cd nar-container-common && \ +mvn $MAVEN_FLAGS install && \ +cd ../jetty-bundle && \ +mvn $MAVEN_FLAGS install && \ +cd ../standard-services-api-bundle && \ +mvn $MAVEN_FLAGS install && \ +cd ../ssl-context-bundle && \ +mvn $MAVEN_FLAGS install && \ +cd ../distributed-cache-services-bundle && \ +mvn $MAVEN_FLAGS install && \ +cd ../standard-bundle && \ +mvn $MAVEN_FLAGS install && \ +cd ../hadoop-libraries-bundle && \ +mvn $MAVEN_FLAGS install && \ +cd ../hadoop-bundle && \ +mvn $MAVEN_FLAGS install && \ +cd ../volatile-provenance-repository-bundle && \ +mvn $MAVEN_FLAGS install && \ +cd ../persistent-provenance-repository-bundle && \ +mvn $MAVEN_FLAGS install && \ +cd ../framework-bundle && \ +mvn $MAVEN_FLAGS install && \ +cd ../execute-script-bundle && \ +mvn $MAVEN_FLAGS install && \ +cd ../monitor-threshold-bundle && \ +mvn $MAVEN_FLAGS install && \ +cd ../update-attribute-bundle && \ +mvn $MAVEN_FLAGS install && \ +cd ../../assemblies/nifi +mvn assembly:assembly http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/misc/nar-maven-plugin/pom.xml ---------------------------------------------------------------------- diff --git a/misc/nar-maven-plugin/pom.xml b/misc/nar-maven-plugin/pom.xml new file mode 100644 index 0000000..3888df3 --- /dev/null +++ b/misc/nar-maven-plugin/pom.xml @@ -0,0 +1,83 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.nifi</groupId> + <artifactId>nar-maven-plugin</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>maven-plugin</packaging> + <name>Apache NiFi NAR Plugin</name> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + <build> + <defaultGoal>install</defaultGoal> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.2</version> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <version>2.5</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-plugin-plugin</artifactId> + <version>3.3</version> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.apache.maven</groupId> + <artifactId>maven-plugin-api</artifactId> + <version>2.0.11</version> + </dependency> + <dependency> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.9</version> + <type>maven-plugin</type> + </dependency> + <dependency> + <!-- No code from maven-jar-plugin is actually used; it's included + just to simplify the dependencies list. --> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.5</version> + </dependency> + <dependency> + <groupId>org.apache.maven.plugin-tools</groupId> + <artifactId>maven-plugin-annotations</artifactId> + <version>3.3</version> + </dependency> + </dependencies> + <distributionManagement> + <repository> + <id>nifi-releases</id> + <url>${nifi.repo.url}</url> + </repository> + <snapshotRepository> + <id>nifi-snapshots</id> + <url>${nifi.snapshot.repo.url}</url> + </snapshotRepository> + </distributionManagement> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java ---------------------------------------------------------------------- diff --git a/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java b/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java new file mode 100644 index 0000000..263fe88 --- /dev/null +++ b/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java @@ -0,0 +1,610 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nifi; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.maven.archiver.MavenArchiveConfiguration; +import org.apache.maven.archiver.MavenArchiver; +import org.apache.maven.artifact.Artifact; +import org.apache.maven.artifact.DependencyResolutionRequiredException; +import org.apache.maven.artifact.factory.ArtifactFactory; +import org.apache.maven.artifact.installer.ArtifactInstaller; +import org.apache.maven.artifact.metadata.ArtifactMetadataSource; +import org.apache.maven.artifact.repository.ArtifactRepository; +import org.apache.maven.artifact.repository.ArtifactRepositoryFactory; +import org.apache.maven.artifact.resolver.ArtifactCollector; +import org.apache.maven.artifact.resolver.ArtifactNotFoundException; +import org.apache.maven.artifact.resolver.ArtifactResolutionException; +import org.apache.maven.artifact.resolver.ArtifactResolver; +import org.apache.maven.plugin.AbstractMojo; +import org.apache.maven.plugin.MojoExecutionException; +import org.apache.maven.plugin.MojoFailureException; +import org.apache.maven.plugin.dependency.utils.DependencyStatusSets; +import org.apache.maven.plugin.dependency.utils.DependencyUtil; +import org.apache.maven.plugin.dependency.utils.filters.DestFileFilter; +import org.apache.maven.plugin.dependency.utils.resolvers.ArtifactsResolver; +import org.apache.maven.plugin.dependency.utils.resolvers.DefaultArtifactsResolver; +import org.apache.maven.plugin.dependency.utils.translators.ArtifactTranslator; +import org.apache.maven.plugin.dependency.utils.translators.ClassifierTypeTranslator; +import org.apache.maven.plugins.annotations.LifecyclePhase; +import org.apache.maven.plugins.annotations.Mojo; +import org.apache.maven.plugins.annotations.Parameter; +import org.apache.maven.plugins.annotations.ResolutionScope; +import org.apache.maven.project.MavenProject; +import org.apache.maven.execution.MavenSession; +import org.apache.maven.plugins.annotations.Component; +import org.apache.maven.project.MavenProjectHelper; +import org.apache.maven.shared.artifact.filter.collection.ArtifactFilterException; +import org.apache.maven.shared.artifact.filter.collection.ArtifactIdFilter; +import org.apache.maven.shared.artifact.filter.collection.ArtifactsFilter; +import org.apache.maven.shared.artifact.filter.collection.ClassifierFilter; +import org.apache.maven.shared.artifact.filter.collection.FilterArtifacts; +import org.apache.maven.shared.artifact.filter.collection.GroupIdFilter; +import org.apache.maven.shared.artifact.filter.collection.ScopeFilter; +import org.apache.maven.shared.artifact.filter.collection.ProjectTransitivityFilter; +import org.apache.maven.shared.artifact.filter.collection.TypeFilter; +import org.codehaus.plexus.archiver.ArchiverException; +import org.codehaus.plexus.archiver.jar.JarArchiver; +import org.codehaus.plexus.archiver.jar.ManifestException; +import org.codehaus.plexus.archiver.manager.ArchiverManager; +import org.codehaus.plexus.util.FileUtils; +import org.codehaus.plexus.util.StringUtils; + +/** + * Packages the current project as an Apache NiFi Archive (NAR). + * + * The following code is derived from maven-dependencies-plugin and + * maven-jar-plugin. The functionality of CopyDependenciesMojo and JarMojo was + * simplified to the use case of NarMojo. + * + */ +@Mojo(name = "nar", defaultPhase = LifecyclePhase.PACKAGE, threadSafe = false, requiresDependencyResolution = ResolutionScope.RUNTIME) +public class NarMojo extends AbstractMojo { + + private static final String[] DEFAULT_EXCLUDES = new String[]{"**/package.html"}; + private static final String[] DEFAULT_INCLUDES = new String[]{"**/**"}; + + /** + * POM + * + */ + @Parameter(property = "project", readonly = true, required = true) + protected MavenProject project; + + @Parameter(property = "session", readonly = true, required = true) + protected MavenSession session; + + /** + * List of files to include. Specified as fileset patterns. + */ + @Parameter(property = "includes") + protected String[] includes; + /** + * List of files to exclude. Specified as fileset patterns. + */ + @Parameter(property = "excludes") + protected String[] excludes; + /** + * Name of the generated NAR. + * + */ + @Parameter(alias = "narName", property = "nar.finalName", defaultValue = "${project.build.finalName}", required = true) + protected String finalName; + + /** + * The Jar archiver. + * + * \@\component role="org.codehaus.plexus.archiver.Archiver" roleHint="jar" + */ + @Component(role = org.codehaus.plexus.archiver.Archiver.class, hint = "jar") + private JarArchiver jarArchiver; + /** + * The archive configuration to use. + * + * See <a + * href="http://maven.apache.org/shared/maven-archiver/index.html">the + * documentation for Maven Archiver</a>. + * + */ + @Parameter(property = "archive") + protected final MavenArchiveConfiguration archive = new MavenArchiveConfiguration(); + /** + * Path to the default MANIFEST file to use. It will be used if + * <code>useDefaultManifestFile</code> is set to <code>true</code>. + * + */ + @Parameter(property = "defaultManifestFiles", defaultValue = "${project.build.outputDirectory}/META-INF/MANIFEST.MF", readonly = true, required = true) + protected File defaultManifestFile; + + /** + * Set this to <code>true</code> to enable the use of the + * <code>defaultManifestFile</code>. + * + * @since 2.2 + */ + @Parameter(property = "nar.useDefaultManifestFile", defaultValue = "false") + protected boolean useDefaultManifestFile; + + @Component + protected MavenProjectHelper projectHelper; + + /** + * Whether creating the archive should be forced. + * + */ + @Parameter(property = "nar.forceCreation", defaultValue = "false") + protected boolean forceCreation; + + /** + * Classifier to add to the artifact generated. If given, the artifact will + * be an attachment instead. + * + */ + @Parameter(property = "classifier") + protected String classifier; + + @Component + protected ArtifactInstaller installer; + + @Component + protected ArtifactRepositoryFactory repositoryFactory; + + /** + * This only applies if the classifier parameter is used. + * + */ + @Parameter(property = "mdep.failOnMissingClassifierArtifact", defaultValue = "true", required = false) + protected boolean failOnMissingClassifierArtifact = true; + + /** + * Comma Separated list of Types to include. Empty String indicates include + * everything (default). + * + */ + @Parameter(property = "includeTypes", required = false) + protected String includeTypes; + + /** + * Comma Separated list of Types to exclude. Empty String indicates don't + * exclude anything (default). + * + */ + @Parameter(property = "excludeTypes", required = false) + protected String excludeTypes; + + /** + * Scope to include. An Empty string indicates all scopes (default). + * + */ + @Parameter(property = "includeScope", required = false) + protected String includeScope; + + /** + * Scope to exclude. An Empty string indicates no scopes (default). + * + */ + @Parameter(property = "excludeScope", required = false) + protected String excludeScope; + + /** + * Comma Separated list of Classifiers to include. Empty String indicates + * include everything (default). + * + */ + @Parameter(property = "includeClassifiers", required = false) + protected String includeClassifiers; + + /** + * Comma Separated list of Classifiers to exclude. Empty String indicates + * don't exclude anything (default). + * + */ + @Parameter(property = "excludeClassifiers", required = false) + protected String excludeClassifiers; + + /** + * Specify classifier to look for. Example: sources + * + */ + @Parameter(property = "classifier", required = false) + protected String copyDepClassifier; + + /** + * Specify type to look for when constructing artifact based on classifier. + * Example: java-source,jar,war, nar + * + */ + @Parameter(property = "type", required = false, defaultValue = "nar") + protected String type; + + /** + * Comma separated list of Artifact names too exclude. + * + */ + @Parameter(property = "excludeArtifacts", required = false) + protected String excludeArtifactIds; + + /** + * Comma separated list of Artifact names to include. + * + */ + @Parameter(property = "includeArtifacts", required = false) + protected String includeArtifactIds; + + /** + * Comma separated list of GroupId Names to exclude. + * + */ + @Parameter(property = "excludeArtifacts", required = false) + protected String excludeGroupIds; + + /** + * Comma separated list of GroupIds to include. + * + */ + @Parameter(property = "includeGroupIds", required = false) + protected String includeGroupIds; + + /** + * Directory to store flag files + * + */ + @Parameter(property = "markersDirectory", required = false, defaultValue = "${project.build.directory}/dependency-maven-plugin-markers") + protected File markersDirectory; + + /** + * Overwrite release artifacts + * + */ + @Parameter(property = "overWriteReleases", required = false) + protected boolean overWriteReleases; + + /** + * Overwrite snapshot artifacts + * + */ + @Parameter(property = "overWriteSnapshots", required = false) + protected boolean overWriteSnapshots; + + /** + * Overwrite artifacts that don't exist or are older than the source. + * + */ + @Parameter(property = "overWriteIfNewer", required = false, defaultValue = "true") + protected boolean overWriteIfNewer; + + /** + * Used to look up Artifacts in the remote repository. + */ + @Component + protected ArtifactFactory factory; + + /** + * Used to look up Artifacts in the remote repository. + * + */ + @Component + protected ArtifactResolver resolver; + + /** + * Artifact collector, needed to resolve dependencies. + * + */ + @Component(role = org.apache.maven.artifact.resolver.ArtifactCollector.class) + protected ArtifactCollector artifactCollector; + + @Component(role = org.apache.maven.artifact.metadata.ArtifactMetadataSource.class) + protected ArtifactMetadataSource artifactMetadataSource; + + /** + * Location of the local repository. + * + */ + @Parameter(property = "localRepository", required = true, readonly = true) + protected ArtifactRepository local; + + /** + * List of Remote Repositories used by the resolver + * + */ + @Parameter(property = "project.remoteArtifactRepositories", required = true, readonly = true) + protected List remoteRepos; + + /** + * To look up Archiver/UnArchiver implementations + * + */ + @Component + protected ArchiverManager archiverManager; + + /** + * Contains the full list of projects in the reactor. + * + */ + @Parameter(property = "reactorProjects", required = true, readonly = true) + protected List reactorProjects; + + /** + * If the plugin should be silent. + * + */ + @Parameter(property = "silent", required = false, defaultValue = "false") + public boolean silent; + + /** + * Output absolute filename for resolved artifacts + * + */ + @Parameter(property = "outputAbsoluteArtifactFilename", defaultValue = "false", required = false) + protected boolean outputAbsoluteArtifactFilename; + + @Override + public void execute() throws MojoExecutionException, MojoFailureException { + copyDependencies(); + makeNar(); + } + + private void copyDependencies() throws MojoExecutionException { + DependencyStatusSets dss = getDependencySets(this.failOnMissingClassifierArtifact); + Set artifacts = dss.getResolvedDependencies(); + + for (Object artifactObj : artifacts) { + copyArtifact((Artifact) artifactObj); + } + + artifacts = dss.getSkippedDependencies(); + for (Object artifactOjb : artifacts) { + Artifact artifact = (Artifact) artifactOjb; + getLog().info(artifact.getFile().getName() + " already exists in destination."); + } + } + + protected void copyArtifact(Artifact artifact) throws MojoExecutionException { + String destFileName = DependencyUtil.getFormattedFileName(artifact, false); + final File destDir = DependencyUtil.getFormattedOutputDirectory(false, false, false, false, false, getDependenciesDirectory(), artifact); + final File destFile = new File(destDir, destFileName); + copyFile(artifact.getFile(), destFile); + } + + protected Artifact getResolvedPomArtifact(Artifact artifact) { + Artifact pomArtifact = this.factory.createArtifact(artifact.getGroupId(), artifact.getArtifactId(), artifact.getVersion(), "", "pom"); + // Resolve the pom artifact using repos + try { + this.resolver.resolve(pomArtifact, this.remoteRepos, this.local); + } catch (ArtifactResolutionException | ArtifactNotFoundException e) { + getLog().info(e.getMessage()); + } + return pomArtifact; + } + + protected ArtifactsFilter getMarkedArtifactFilter() { + return new DestFileFilter(this.overWriteReleases, this.overWriteSnapshots, this.overWriteIfNewer, false, false, false, false, false, getDependenciesDirectory()); + } + + protected DependencyStatusSets getDependencySets(boolean stopOnFailure) throws MojoExecutionException { + // add filters in well known order, least specific to most specific + FilterArtifacts filter = new FilterArtifacts(); + + filter.addFilter(new ProjectTransitivityFilter(project.getDependencyArtifacts(), false)); + filter.addFilter(new ScopeFilter(this.includeScope, this.excludeScope)); + filter.addFilter(new TypeFilter(this.includeTypes, this.excludeTypes)); + filter.addFilter(new ClassifierFilter(this.includeClassifiers, this.excludeClassifiers)); + filter.addFilter(new GroupIdFilter(this.includeGroupIds, this.excludeGroupIds)); + filter.addFilter(new ArtifactIdFilter(this.includeArtifactIds, this.excludeArtifactIds)); + + // explicitly filter our nar dependencies + filter.addFilter(new TypeFilter("", "nar")); + + // start with all artifacts. + Set artifacts = project.getArtifacts(); + + // perform filtering + try { + artifacts = filter.filter(artifacts); + } catch (ArtifactFilterException e) { + throw new MojoExecutionException(e.getMessage(), e); + } + + // transform artifacts if classifier is set + final DependencyStatusSets status; + if (StringUtils.isNotEmpty(copyDepClassifier)) { + status = getClassifierTranslatedDependencies(artifacts, stopOnFailure); + } else { + status = filterMarkedDependencies(artifacts); + } + + return status; + } + + protected DependencyStatusSets getClassifierTranslatedDependencies(Set artifacts, boolean stopOnFailure) throws MojoExecutionException { + Set unResolvedArtifacts = new HashSet(); + Set resolvedArtifacts = artifacts; + DependencyStatusSets status = new DependencyStatusSets(); + + // possibly translate artifacts into a new set of artifacts based on the + // classifier and type + // if this did something, we need to resolve the new artifacts + if (StringUtils.isNotEmpty(copyDepClassifier)) { + ArtifactTranslator translator = new ClassifierTypeTranslator(this.copyDepClassifier, this.type, this.factory); + artifacts = translator.translate(artifacts, getLog()); + + status = filterMarkedDependencies(artifacts); + + // the unskipped artifacts are in the resolved set. + artifacts = status.getResolvedDependencies(); + + // resolve the rest of the artifacts + ArtifactsResolver artifactsResolver = new DefaultArtifactsResolver(this.resolver, this.local, + this.remoteRepos, stopOnFailure); + resolvedArtifacts = artifactsResolver.resolve(artifacts, getLog()); + + // calculate the artifacts not resolved. + unResolvedArtifacts.addAll(artifacts); + unResolvedArtifacts.removeAll(resolvedArtifacts); + } + + // return a bean of all 3 sets. + status.setResolvedDependencies(resolvedArtifacts); + status.setUnResolvedDependencies(unResolvedArtifacts); + + return status; + } + + protected DependencyStatusSets filterMarkedDependencies(Set artifacts) throws MojoExecutionException { + // remove files that have markers already + FilterArtifacts filter = new FilterArtifacts(); + filter.clearFilters(); + filter.addFilter(getMarkedArtifactFilter()); + + Set unMarkedArtifacts; + try { + unMarkedArtifacts = filter.filter(artifacts); + } catch (ArtifactFilterException e) { + throw new MojoExecutionException(e.getMessage(), e); + } + + // calculate the skipped artifacts + Set skippedArtifacts = new HashSet(); + skippedArtifacts.addAll(artifacts); + skippedArtifacts.removeAll(unMarkedArtifacts); + + return new DependencyStatusSets(unMarkedArtifacts, null, skippedArtifacts); + } + + protected void copyFile(File artifact, File destFile) throws MojoExecutionException { + try { + getLog().info("Copying " + (this.outputAbsoluteArtifactFilename ? artifact.getAbsolutePath() : artifact.getName()) + " to " + destFile); + FileUtils.copyFile(artifact, destFile); + } catch (Exception e) { + throw new MojoExecutionException("Error copying artifact from " + artifact + " to " + destFile, e); + } + } + + private File getClassesDirectory() { + final File outputDirectory = new File(project.getBasedir(), "target"); + return new File(outputDirectory, "classes"); + } + + private File getDependenciesDirectory() { + return new File(getClassesDirectory(), "META-INF/dependencies"); + } + + private void makeNar() throws MojoExecutionException { + File narFile = createArchive(); + + if (classifier != null) { + projectHelper.attachArtifact(project, "nar", classifier, narFile); + } else { + project.getArtifact().setFile(narFile); + } + } + + public File createArchive() throws MojoExecutionException { + final File outputDirectory = new File(project.getBasedir(), "target"); + File narFile = getNarFile(outputDirectory, finalName, classifier); + MavenArchiver archiver = new MavenArchiver(); + archiver.setArchiver(jarArchiver); + archiver.setOutputFile(narFile); + archive.setForced(forceCreation); + + try { + File contentDirectory = getClassesDirectory(); + if (!contentDirectory.exists()) { + getLog().warn("NAR will be empty - no content was marked for inclusion!"); + } else { + archiver.getArchiver().addDirectory(contentDirectory, getIncludes(), getExcludes()); + } + + File existingManifest = defaultManifestFile; + if (useDefaultManifestFile && existingManifest.exists() && archive.getManifestFile() == null) { + getLog().info("Adding existing MANIFEST to archive. Found under: " + existingManifest.getPath()); + archive.setManifestFile(existingManifest); + } + + // automatically add the artifact id to the manifest + archive.addManifestEntry("Nar-Id", project.getArtifactId()); + + // look for a nar dependency + String narDependency = getNarDependency(); + if (narDependency != null) { + archive.addManifestEntry("Nar-Dependency-Id", narDependency); + } + + archiver.createArchive(session, project, archive); + return narFile; + } catch (ArchiverException | MojoExecutionException | ManifestException | IOException | DependencyResolutionRequiredException e) { + throw new MojoExecutionException("Error assembling NAR", e); + } + } + + private String[] getIncludes() { + if (includes != null && includes.length > 0) { + return includes; + } + return DEFAULT_INCLUDES; + } + + private String[] getExcludes() { + if (excludes != null && excludes.length > 0) { + return excludes; + } + return DEFAULT_EXCLUDES; + } + + protected File getNarFile(File basedir, String finalName, String classifier) { + if (classifier == null) { + classifier = ""; + } else if (classifier.trim().length() > 0 && !classifier.startsWith("-")) { + classifier = "-" + classifier; + } + + return new File(basedir, finalName + classifier + ".nar"); + } + + private String getNarDependency() throws MojoExecutionException { + String narDependency = null; + + // get nar dependencies + FilterArtifacts filter = new FilterArtifacts(); + filter.addFilter(new TypeFilter("nar", "")); + + // start with all artifacts. + Set artifacts = project.getArtifacts(); + + // perform filtering + try { + artifacts = filter.filter(artifacts); + } catch (ArtifactFilterException e) { + throw new MojoExecutionException(e.getMessage(), e); + } + + // ensure there is a single nar dependency + if (artifacts.size() > 1) { + throw new MojoExecutionException("Each NAR represents a ClassLoader. A NAR dependency allows that NAR's ClassLoader to be " + + "used as the parent of this NAR's ClassLoader. As a result, only a single NAR dependency is allowed."); + } else if (artifacts.size() == 1) { + final Artifact artifact = (Artifact) artifacts.iterator().next(); + narDependency = artifact.getArtifactId(); + } + + return narDependency; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml ---------------------------------------------------------------------- diff --git a/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml b/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml new file mode 100644 index 0000000..0680d18 --- /dev/null +++ b/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml @@ -0,0 +1,52 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<component-set> + <components> + <component> + <role>org.apache.maven.lifecycle.mapping.LifecycleMapping</role> + <role-hint>nar</role-hint> + <implementation>org.apache.maven.lifecycle.mapping.DefaultLifecycleMapping</implementation> + <configuration> + <lifecycles> + <lifecycle> + <id>default</id> + <phases> + <process-resources>org.apache.maven.plugins:maven-resources-plugin:resources</process-resources> + <compile>org.apache.maven.plugins:maven-compiler-plugin:compile</compile> + <process-test-resources>org.apache.maven.plugins:maven-resources-plugin:testResources</process-test-resources> + <test-compile>org.apache.maven.plugins:maven-compiler-plugin:testCompile</test-compile> + <test>org.apache.maven.plugins:maven-surefire-plugin:test</test> + <package>org.apache.nifi:nar-maven-plugin:nar</package> + <install>org.apache.maven.plugins:maven-install-plugin:install</install> + <deploy>org.apache.maven.plugins:maven-deploy-plugin:deploy</deploy> + </phases> + </lifecycle> + </lifecycles> + </configuration> + </component> + <component> + <role>org.apache.maven.artifact.handler.ArtifactHandler</role> + <role-hint>nar</role-hint> + <implementation>org.apache.maven.artifact.handler.DefaultArtifactHandler</implementation> + <configuration> + <type>nar</type> + <language>java</language> + <addedToClasspath>false</addedToClasspath> + <includesDependencies>true</includesDependencies> + </configuration> + </component> + </components> +</component-set> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml new file mode 100644 index 0000000..6280349 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml @@ -0,0 +1,67 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-services-bundle</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>distributed-cache-client-service</artifactId> + <packaging>jar</packaging> + + <name>Distributed Cache Client Service</name> + <description>Provides a Client for interfacing with a Distributed Cache</description> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-client-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-protocol</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>remote-communications-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-stream-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>ssl-context-service-api</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.9</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java new file mode 100644 index 0000000..f838c2f --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +public interface CommsSession extends Closeable { + + void setTimeout(final long value, final TimeUnit timeUnit); + + InputStream getInputStream() throws IOException; + + OutputStream getOutputStream() throws IOException; + + boolean isClosed(); + + void interrupt(); + + String getHostname(); + + int getPort(); + + long getTimeout(TimeUnit timeUnit); + + SSLContext getSSLContext(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java new file mode 100644 index 0000000..ee96660 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.annotation.OnConfigured; +import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake; +import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; +import org.apache.nifi.io.ByteArrayOutputStream; +import org.apache.nifi.io.DataOutputStream; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient { + + private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class); + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Server Hostname") + .description("The name of the server that is running the DistributedMapCacheServer service") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("Server Port") + .description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .defaultValue("4557") + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description( + "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted") + .required(false) + .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class)) + .defaultValue(null) + .build(); + public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .description( + "Specifies how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("30 secs") + .build(); + + private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>(); + private volatile ConfigurationContext configContext; + private volatile boolean closed = false; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(HOSTNAME); + descriptors.add(PORT); + descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(COMMUNICATIONS_TIMEOUT); + return descriptors; + } + + @OnConfigured + public void cacheConfig(final ConfigurationContext context) { + this.configContext = context; + } + + @Override + public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) + throws IOException { + return withCommsSession(new CommsAction<Boolean>() { + @Override + public Boolean execute(final CommsSession session) throws IOException { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("putIfAbsent"); + + serialize(key, keySerializer, dos); + serialize(value, valueSerializer, dos); + + dos.flush(); + + final DataInputStream dis = new DataInputStream(session.getInputStream()); + return dis.readBoolean(); + } + }); + } + + @Override + public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { + return withCommsSession(new CommsAction<Boolean>() { + @Override + public Boolean execute(final CommsSession session) throws IOException { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("containsKey"); + + serialize(key, keySerializer, dos); + dos.flush(); + + final DataInputStream dis = new DataInputStream(session.getInputStream()); + return dis.readBoolean(); + } + }); + } + + @Override + public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, + final Deserializer<V> valueDeserializer) throws IOException { + return withCommsSession(new CommsAction<V>() { + @Override + public V execute(final CommsSession session) throws IOException { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("getAndPutIfAbsent"); + + serialize(key, keySerializer, dos); + serialize(value, valueSerializer, dos); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + final byte[] responseBuffer = readLengthDelimitedResponse(dis); + return valueDeserializer.deserialize(responseBuffer); + } + }); + } + + @Override + public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { + return withCommsSession(new CommsAction<V>() { + @Override + public V execute(final CommsSession session) throws IOException { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("get"); + + serialize(key, keySerializer, dos); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + final byte[] responseBuffer = readLengthDelimitedResponse(dis); + return valueDeserializer.deserialize(responseBuffer); + } + }); + } + + @Override + public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException { + return withCommsSession(new CommsAction<Boolean>() { + @Override + public Boolean execute(final CommsSession session) throws IOException { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("remove"); + + serialize(key, serializer, dos); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + return dis.readBoolean(); + } + }); + } + + private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException { + final int responseLength = dis.readInt(); + final byte[] responseBuffer = new byte[responseLength]; + dis.readFully(responseBuffer); + return responseBuffer; + } + + public CommsSession createCommsSession(final ConfigurationContext context) throws IOException { + final String hostname = context.getProperty(HOSTNAME).getValue(); + final int port = context.getProperty(PORT).asInteger(); + final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + final CommsSession commsSession; + if (sslContextService == null) { + commsSession = new StandardCommsSession(hostname, port); + } else { + commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port); + } + + commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS); + return commsSession; + } + + private CommsSession leaseCommsSession() throws IOException { + CommsSession session = queue.poll(); + if (session != null && !session.isClosed()) { + return session; + } + + session = createCommsSession(configContext); + final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); + try { + ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator); + } catch (final HandshakeException e) { + try { + session.close(); + } catch (final IOException ioe) { + } + + throw new IOException(e); + } + + return session; + } + + @Override + public void close() throws IOException { + this.closed = true; + + CommsSession commsSession; + while ((commsSession = queue.poll()) != null) { + try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) { + dos.writeUTF("close"); + dos.flush(); + commsSession.close(); + } catch (final IOException e) { + } + } + logger.info("Closed {}", new Object[] { getIdentifier() }); + } + + @Override + protected void finalize() throws Throwable { + if (!closed) + close(); + logger.debug("Finalize called"); + } + + private <T> void serialize(final T value, final Serializer<T> serializer, final DataOutputStream dos) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializer.serialize(value, baos); + dos.writeInt(baos.size()); + baos.writeTo(dos); + } + + private <T> T withCommsSession(final CommsAction<T> action) throws IOException { + if (closed) { + throw new IllegalStateException("Client is closed"); + } + + final CommsSession session = leaseCommsSession(); + try { + return action.execute(session); + } catch (final IOException ioe) { + try { + session.close(); + } catch (final IOException ignored) { + } + + throw ioe; + } finally { + if (!session.isClosed()) { + if (this.closed) { + try { + session.close(); + } catch (final IOException ioe) { + } + } else { + queue.offer(session); + } + } + } + } + + private static interface CommsAction<T> { + T execute(CommsSession commsSession) throws IOException; + } + +}
