http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java new file mode 100644 index 0000000..8f186fd --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Standard configuration context to be passed to onConfigured method of Providers. + */ +public class StandardProviderConfigurationContext implements ProviderConfigurationContext { + + private final Map<String,String> properties; + + public StandardProviderConfigurationContext(final Map<String, String> properties) { + this.properties = Collections.unmodifiableMap(new HashMap<>(properties)); + } + + @Override + public Map<String, String> getProperties() { + return properties; + } + +}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java new file mode 100644 index 0000000..1a83d68 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.metadata.MetadataProvider; +import org.apache.nifi.registry.properties.NiFiRegistryProperties; +import org.apache.nifi.registry.provider.generated.Property; +import org.apache.nifi.registry.provider.generated.Providers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; + +import javax.xml.XMLConstants; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.transform.stream.StreamSource; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; +import java.io.File; +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Standard implementation of ProviderFactory. + */ +public class StandardProviderFactory implements ProviderFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(StandardProviderFactory.class); + + private static final String PROVIDERS_XSD = "/providers.xsd"; + private static final String JAXB_GENERATED_PATH = "org.apache.nifi.registry.provider.generated"; + private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext(); + + /** + * Load the JAXBContext. + */ + private static JAXBContext initializeJaxbContext() { + try { + return JAXBContext.newInstance(JAXB_GENERATED_PATH, StandardProviderFactory.class.getClassLoader()); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXBContext."); + } + } + + private final NiFiRegistryProperties properties; + private final AtomicReference<Providers> providersHolder = new AtomicReference<>(null); + + private FlowPersistenceProvider flowPersistenceProvider; + private MetadataProvider metadataProvider; + + public StandardProviderFactory(final NiFiRegistryProperties properties) { + this.properties = properties; + + if (this.properties == null) { + throw new IllegalStateException("NiFiRegistryProperties cannot be null"); + } + } + + @Override + public synchronized void initialize() throws ProviderFactoryException { + if (providersHolder.get() == null) { + final File providersConfigFile = properties.getProvidersConfigurationFile(); + if (providersConfigFile.exists()) { + try { + // find the schema + final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + final Schema schema = schemaFactory.newSchema(StandardProviderFactory.class.getResource(PROVIDERS_XSD)); + + // attempt to unmarshal + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); + unmarshaller.setSchema(schema); + + // set the holder for later use + final JAXBElement<Providers> element = unmarshaller.unmarshal(new StreamSource(providersConfigFile), Providers.class); + providersHolder.set(element.getValue()); + } catch (SAXException | JAXBException e) { + throw new ProviderFactoryException("Unable to load the providers configuration file at: " + providersConfigFile.getAbsolutePath(), e); + } + } else { + throw new ProviderFactoryException("Unable to find the providers configuration file at " + providersConfigFile.getAbsolutePath()); + } + } + } + + @Override + public synchronized MetadataProvider getMetadataProvider() { + if (metadataProvider == null) { + if (providersHolder.get() == null) { + throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider"); + } + + final Providers providers = providersHolder.get(); + final org.apache.nifi.registry.provider.generated.Provider jaxbMetadataProvider = providers.getMetadataProvider(); + final String metadataProviderClassName = jaxbMetadataProvider.getClazz(); + + try { + final Class<?> rawMetadataProviderClass = Class.forName(metadataProviderClassName, true, StandardProviderFactory.class.getClassLoader()); + final Class<? extends MetadataProvider> metadataProviderClass = rawMetadataProviderClass.asSubclass(MetadataProvider.class); + + // otherwise create a new instance + final Constructor constructor = metadataProviderClass.getConstructor(); + metadataProvider = (MetadataProvider) constructor.newInstance(); + + LOGGER.info("Instantiated MetadataProvider with class name {}", new Object[] {metadataProviderClassName}); + } catch (Exception e) { + throw new ProviderFactoryException("Error creating MetadataProvider with class name: " + metadataProviderClassName, e); + } + + final ProviderConfigurationContext configurationContext = createConfigurationContext(jaxbMetadataProvider.getProperty()); + metadataProvider.onConfigured(configurationContext); + LOGGER.info("Configured MetadataProvider with class name {}", new Object[] {metadataProviderClassName}); + } + + return metadataProvider; + } + + @Override + public synchronized FlowPersistenceProvider getFlowPersistenceProvider() { + if (flowPersistenceProvider == null) { + if (providersHolder.get() == null) { + throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider"); + } + + final Providers providers = providersHolder.get(); + final org.apache.nifi.registry.provider.generated.Provider jaxbFlowProvider = providers.getFlowPersistenceProvider(); + final String flowProviderClassName = jaxbFlowProvider.getClazz(); + + try { + final Class<?> rawFlowProviderClass = Class.forName(flowProviderClassName, true, StandardProviderFactory.class.getClassLoader()); + final Class<? extends FlowPersistenceProvider> flowProviderClass = rawFlowProviderClass.asSubclass(FlowPersistenceProvider.class); + + final Constructor constructor = flowProviderClass.getConstructor(); + flowPersistenceProvider = (FlowPersistenceProvider) constructor.newInstance(); + + LOGGER.info("Instantiated FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName}); + } catch (Exception e) { + throw new ProviderFactoryException("Error creating FlowPersistenceProvider with class name: " + flowProviderClassName, e); + } + + final ProviderConfigurationContext configurationContext = createConfigurationContext(jaxbFlowProvider.getProperty()); + flowPersistenceProvider.onConfigured(configurationContext); + LOGGER.info("Configured FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName}); + } + + return flowPersistenceProvider; + } + + private ProviderConfigurationContext createConfigurationContext(final List<Property> configProperties) { + final Map<String,String> properties = new HashMap<>(); + + if (configProperties != null) { + configProperties.stream().forEach(p -> properties.put(p.getName(), p.getValue())); + } + + return new StandardProviderConfigurationContext(properties); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/main/xsd/providers.xsd ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/xsd/providers.xsd b/nifi-registry-framework/src/main/xsd/providers.xsd new file mode 100644 index 0000000..cb71ed8 --- /dev/null +++ b/nifi-registry-framework/src/main/xsd/providers.xsd @@ -0,0 +1,51 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> + + <!-- Provider type --> + <xs:complexType name="Provider"> + <xs:sequence> + <xs:element name="class" type="NonEmptyStringType"/> + <xs:element name="property" type="Property" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + + <!-- Name/Value properties--> + <xs:complexType name="Property"> + <xs:simpleContent> + <xs:extension base="xs:string"> + <xs:attribute name="name" type="NonEmptyStringType"></xs:attribute> + </xs:extension> + </xs:simpleContent> + </xs:complexType> + + <xs:simpleType name="NonEmptyStringType"> + <xs:restriction base="xs:string"> + <xs:minLength value="1"/> + </xs:restriction> + </xs:simpleType> + + <!-- providers --> + <xs:element name="providers"> + <xs:complexType> + <xs:sequence> + <xs:element name="metadataProvider" type="Provider" minOccurs="1" maxOccurs="1"/> + <xs:element name="flowPersistenceProvider" type="Provider" minOccurs="1" maxOccurs="1" /> + </xs:sequence> + </xs:complexType> + </xs:element> + +</xs:schema> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/java/org/apache/nifi/registry/flow/TestStandardFlowSnapshotContext.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/flow/TestStandardFlowSnapshotContext.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/flow/TestStandardFlowSnapshotContext.java new file mode 100644 index 0000000..e0c7f16 --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/flow/TestStandardFlowSnapshotContext.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.flow; + +import org.junit.Assert; +import org.junit.Test; + +public class TestStandardFlowSnapshotContext { + + @Test + public void testBuilder() { + final String bucketId = "1234-1234-1234-1234"; + final String bucketName = "Some Bucket"; + final String flowId = "2345-2345-2345-2345"; + final String flowName = "Some Flow"; + final int version = 2; + final String comments = "Some Comments"; + final long timestamp = System.currentTimeMillis(); + + final FlowSnapshotContext context = new StandardFlowSnapshotContext.Builder() + .bucketId(bucketId) + .bucketName(bucketName) + .flowId(flowId) + .flowName(flowName) + .version(version) + .comments(comments) + .snapshotTimestamp(timestamp) + .build(); + + Assert.assertEquals(bucketId, context.getBucketId()); + Assert.assertEquals(bucketName, context.getBucketName()); + Assert.assertEquals(flowId, context.getFlowId()); + Assert.assertEquals(flowName, context.getFlowName()); + Assert.assertEquals(version, context.getVersion()); + Assert.assertEquals(comments, context.getComments()); + Assert.assertEquals(timestamp, context.getSnapshotTimestamp()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java new file mode 100644 index 0000000..2eed54f --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.flow.FlowSnapshotContext; +import org.apache.nifi.registry.flow.FlowPersistenceException; + +import java.util.Map; + +public class MockFlowPersistenceProvider implements FlowPersistenceProvider { + + private Map<String,String> properties; + + @Override + public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException { + properties = configurationContext.getProperties(); + } + + @Override + public void saveSnapshot(FlowSnapshotContext context, byte[] content) throws FlowPersistenceException { + + } + + @Override + public byte[] getSnapshot(String bucketId, String flowId, int version) throws FlowPersistenceException { + return new byte[0]; + } + + @Override + public void deleteSnapshots(String bucketId, String flowId) throws FlowPersistenceException { + + } + + @Override + public void deleteSnapshot(String bucketId, String flowId, int version) throws FlowPersistenceException { + + } + + public Map<String,String> getProperties() { + return properties; + } +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java new file mode 100644 index 0000000..06d3e1a --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.metadata.MetadataProvider; + +import java.util.Map; +import java.util.Set; + +public class MockMetadataProvider implements MetadataProvider { + + private Map<String,String> properties; + + @Override + public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException { + this.properties = configurationContext.getProperties(); + } + + public Map<String,String> getProperties() { + return properties; + } + + @Override + public Bucket createBucket(Bucket bucket) { + return null; + } + + @Override + public Bucket getBucket(String bucketIdentifier) { + return null; + } + + @Override + public Set<Bucket> getBuckets() { + return null; + } + + @Override + public Bucket updateBucket(Bucket bucket) { + return null; + } + + @Override + public void deleteBucket(String bucketIdentifier) { + + } + + @Override + public VersionedFlow createFlow(String bucketIdentifier, VersionedFlow flow) { + return null; + } + + @Override + public VersionedFlow getFlow(String flowIdentifier) { + return null; + } + + @Override + public Set<VersionedFlow> getFlows() { + return null; + } + + @Override + public Set<VersionedFlow> getFlows(String bucketId) { + return null; + } + + @Override + public VersionedFlow updateFlow(VersionedFlow versionedFlow) { + return null; + } + + @Override + public void deleteFlow(String flowIdentifier) { + + } + + @Override + public VersionedFlowSnapshot createFlowSnapshot(VersionedFlowSnapshot flowSnapshot) { + return null; + } + + @Override + public VersionedFlowSnapshot getFlowSnapshot(String flowIdentifier, Integer version) { + return null; + } + + @Override + public void deleteFlowSnapshot(String flowIdentifier, Integer version) { + + } +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java new file mode 100644 index 0000000..2bec5ba --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.metadata.MetadataProvider; +import org.apache.nifi.registry.properties.NiFiRegistryProperties; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestStandardProviderFactory { + + @Test + public void testGetProvidersSuccess() { + final NiFiRegistryProperties props = new NiFiRegistryProperties(); + props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-good.xml"); + + final ProviderFactory providerFactory = new StandardProviderFactory(props); + providerFactory.initialize(); + + final MetadataProvider metadataProvider = providerFactory.getMetadataProvider(); + assertNotNull(metadataProvider); + assertTrue(metadataProvider instanceof MockMetadataProvider); + + final MockMetadataProvider mockMetadataProvider = (MockMetadataProvider) metadataProvider; + assertNotNull(mockMetadataProvider.getProperties()); + assertEquals("metadata foo", mockMetadataProvider.getProperties().get("Metadata Property 1")); + assertEquals("metadata bar", mockMetadataProvider.getProperties().get("Metadata Property 2")); + + final FlowPersistenceProvider flowPersistenceProvider = providerFactory.getFlowPersistenceProvider(); + assertNotNull(flowPersistenceProvider); + + final MockFlowPersistenceProvider mockFlowProvider = (MockFlowPersistenceProvider) flowPersistenceProvider; + assertNotNull(mockFlowProvider.getProperties()); + assertEquals("flow foo", mockFlowProvider.getProperties().get("Flow Property 1")); + assertEquals("flow bar", mockFlowProvider.getProperties().get("Flow Property 2")); + } + + @Test(expected = ProviderFactoryException.class) + public void testGetMetadataProviderBeforeInitializingShouldThrowException() { + final NiFiRegistryProperties props = new NiFiRegistryProperties(); + props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-good.xml"); + + final ProviderFactory providerFactory = new StandardProviderFactory(props); + providerFactory.getMetadataProvider(); + } + + @Test(expected = ProviderFactoryException.class) + public void testGetFlowProviderBeforeInitializingShouldThrowException() { + final NiFiRegistryProperties props = new NiFiRegistryProperties(); + props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-good.xml"); + + final ProviderFactory providerFactory = new StandardProviderFactory(props); + providerFactory.getFlowPersistenceProvider(); + } + + @Test(expected = ProviderFactoryException.class) + public void testProvidersConfigDoesNotExist() { + final NiFiRegistryProperties props = new NiFiRegistryProperties(); + props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-does-not-exist.xml"); + + final ProviderFactory providerFactory = new StandardProviderFactory(props); + providerFactory.initialize(); + } + + @Test(expected = ProviderFactoryException.class) + public void testMetadataProviderClassNotFound() { + final NiFiRegistryProperties props = new NiFiRegistryProperties(); + props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-class-not-found.xml"); + + final ProviderFactory providerFactory = new StandardProviderFactory(props); + providerFactory.initialize(); + + providerFactory.getMetadataProvider(); + } + + @Test(expected = ProviderFactoryException.class) + public void testFlowProviderClassNotFound() { + final NiFiRegistryProperties props = new NiFiRegistryProperties(); + props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-class-not-found.xml"); + + final ProviderFactory providerFactory = new StandardProviderFactory(props); + providerFactory.initialize(); + + providerFactory.getFlowPersistenceProvider(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/resources/provider/providers-class-not-found.xml ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/resources/provider/providers-class-not-found.xml b/nifi-registry-framework/src/test/resources/provider/providers-class-not-found.xml new file mode 100644 index 0000000..8b5debe --- /dev/null +++ b/nifi-registry-framework/src/test/resources/provider/providers-class-not-found.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<providers> + + <metadataProvider> + <class>org.apache.nifi.registry.provider.MetadataProviderXXX</class> + <property name="Metadata Property 1">foo</property> + <property name="Metadata Property 2">bar</property> + </metadataProvider> + + <flowPersistenceProvider> + <class>org.apache.nifi.registry.provider.FlowProviderXXX</class> + <property name="Flow Property 1">foo</property> + <property name="Flow Property 2">bar</property> + </flowPersistenceProvider> + +</providers> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/resources/provider/providers-good.xml ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/resources/provider/providers-good.xml b/nifi-registry-framework/src/test/resources/provider/providers-good.xml new file mode 100644 index 0000000..4ef2a06 --- /dev/null +++ b/nifi-registry-framework/src/test/resources/provider/providers-good.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<providers> + + <metadataProvider> + <class>org.apache.nifi.registry.provider.MockMetadataProvider</class> + <property name="Metadata Property 1">metadata foo</property> + <property name="Metadata Property 2">metadata bar</property> + </metadataProvider> + + <flowPersistenceProvider> + <class>org.apache.nifi.registry.provider.MockFlowPersistenceProvider</class> + <property name="Flow Property 1">flow foo</property> + <property name="Flow Property 2">flow bar</property> + </flowPersistenceProvider> + +</providers> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java ---------------------------------------------------------------------- diff --git a/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java b/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java index 5efea84..35a1b48 100644 --- a/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java +++ b/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java @@ -45,9 +45,12 @@ public class NiFiRegistryProperties extends Properties { public static final String SECURITY_NEED_CLIENT_AUTH = "nifi.registry.security.needClientAuth"; public static final String SECURITY_AUTHORIZED_USERS = "nifi.registry.security.authorized.users"; + public static final String PROVIDERS_CONFIGURATION_FILE = "nifi.registry.providers.configuration.file"; + // Defaults public static final String DEFAULT_WEB_WORKING_DIR = "./work/jetty"; public static final String DEFAULT_WAR_DIR = "./lib"; + public static final String DEFAULT_PROVIDERS_CONFIGURATION_FILE = "./conf/providers.xml"; public int getWebThreads() { int webThreads = 200; @@ -144,4 +147,12 @@ public class NiFiRegistryProperties extends Properties { return new File(authorizedUsersFile); } + public File getProvidersConfigurationFile() { + final String value = getProperty(PROVIDERS_CONFIGURATION_FILE); + if (StringUtils.isBlank(value)) { + return new File(DEFAULT_PROVIDERS_CONFIGURATION_FILE); + } else { + return new File(value); + } + } } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-api/pom.xml b/nifi-registry-provider-api/pom.xml new file mode 100644 index 0000000..eba6cfa --- /dev/null +++ b/nifi-registry-provider-api/pom.xml @@ -0,0 +1,40 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>nifi-registry-provider-api</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry-data-model</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceException.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceException.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceException.java new file mode 100644 index 0000000..4287fc8 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.flow; + +/** + * An Exception for errors encountered when a FlowPersistenceProvider saves or retrieves a flow. + */ +public class FlowPersistenceException extends RuntimeException { + + public FlowPersistenceException(String message) { + super(message); + } + + public FlowPersistenceException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java new file mode 100644 index 0000000..8648722 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.flow; + +import org.apache.nifi.registry.provider.Provider; + +/** + * A service that can store and retrieve versioned flow snapshots. + * + * NOTE: Although this interface is intended to be an extension point, it is not yet considered stable and thus may + * change across releases until the registry matures. + */ +public interface FlowPersistenceProvider extends Provider { + + /** + * Persists a serialized versioned flow snapshot. + * + * @param context the context for the snapshot being persisted + * @param content the serialized snapshot to persist + * @throws FlowPersistenceException if the snapshot could not be persisted + */ + void saveSnapshot(FlowSnapshotContext context, byte[] content) throws FlowPersistenceException; + + /** + * Retrieves a versioned flow snapshot. + * + * @param bucketId the bucket id where the snapshot is located + * @param flowId the id of the versioned flow the snapshot belongs to + * @param version the version of the snapshot + * @return the bytes for the requested snapshot, or null if not found + * @throws FlowPersistenceException if the snapshot could not be retrieved due to an error in underlying provider + */ + byte[] getSnapshot(String bucketId, String flowId, int version) throws FlowPersistenceException; + + /** + * Deletes all snapshots for the versioned flow with the given id. + * + * @param bucketId the bucket the versioned flow belongs to + * @param flowId the id of the versioned flow + * @throws FlowPersistenceException if the snapshots could not be deleted due to an error in underlying provider + */ + void deleteSnapshots(String bucketId, String flowId) throws FlowPersistenceException; + + /** + * Deletes the given snapshot. + * + * @param bucketId the bucket id where the snapshot is located + * @param flowId the id of the versioned flow the snapshot belongs to + * @param version the version of the snapshot + * @throws FlowPersistenceException if the snapshot could not be deleted due to an error in underlying provider + */ + void deleteSnapshot(String bucketId, String flowId, int version) throws FlowPersistenceException; + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java new file mode 100644 index 0000000..c5e06f5 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.flow; + +/** + * The context that will be passed to the flow provider when saving a snapshot of a versioned flow. + */ +public interface FlowSnapshotContext { + + /** + * @return the id of the bucket this snapshot belongs to + */ + String getBucketId(); + + /** + * @return the name of the bucket this snapshot belongs to + */ + String getBucketName(); + + /** + * @return the id of the versioned flow this snapshot belongs to + */ + String getFlowId(); + + /** + * @return the name of the versioned flow this snapshot belongs to + */ + String getFlowName(); + + /** + * @return the version of the snapshot + */ + int getVersion(); + + /** + * @return the comments for the snapshot + */ + String getComments(); + + /** + * @return the timestamp the snapshot was created + */ + long getSnapshotTimestamp(); + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java new file mode 100644 index 0000000..558f7aa --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.metadata; + +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.provider.Provider; + +import java.util.Set; + +/** + * A service for managing metadata about all objects stored by the registry. + * + * NOTE: Although this interface is intended to be an extension point, it is not yet considered stable and thus may + * change across releases until the registry matures. + */ +public interface MetadataProvider extends Provider { + + /** + * Creates the given bucket. + * + * @param bucket the bucket to create + * @return the created bucket + */ + Bucket createBucket(Bucket bucket); + + /** + * Retrieves the bucket with the given id. + * + * @param bucketIdentifier the id of the bucket to retrieve + * @return the bucket with the given id, or null if it does not exist + */ + Bucket getBucket(String bucketIdentifier); + + /** + * Updates the given bucket, only the name and description should be allowed to be updated. + * + * @param bucket the updated bucket to save + * @return the updated bucket, or null if no bucket with the given id exists + */ + Bucket updateBucket(Bucket bucket); + + /** + * Deletes the bucket with the given identifier if one exists. + * + * @param bucketIdentifier the id of the bucket to delete + */ + void deleteBucket(String bucketIdentifier); + + /** + * Retrieves all buckets known to this metadata provider. + * + * @return the set of all buckets + */ + Set<Bucket> getBuckets(); + + /** + * Creates a versioned flow in the given bucket. + * + * @param bucketIdentifier the id of the bucket where the flow is being created + * @param flow the versioned flow to create + * @return the created versioned flow + * @throws IllegalStateException if no bucket with the given identifier exists + */ + VersionedFlow createFlow(String bucketIdentifier, VersionedFlow flow); + + /** + * Retrieves the versioned flow with the given id. + * + * @param flowIdentifier the identifier of the flow to retrieve + * @return the versioned flow with the given id, or null if no flow with the given id exists + */ + VersionedFlow getFlow(String flowIdentifier); + + /** + * Updates the given versioned flow, only the name and description should be allowed to be updated. + * + * @param versionedFlow the updated versioned flow to save + * @return the updated versioned flow + */ + VersionedFlow updateFlow(VersionedFlow versionedFlow); + + /** + * Deletes the versioned flow with the given identifier if one exists. + * + * @param flowIdentifier the id of the versioned flow to delete + */ + void deleteFlow(String flowIdentifier); + + /** + * Retrieves all versioned flows known to this metadata provider. + * + * @return the set of all versioned flows + */ + Set<VersionedFlow> getFlows(); + + /** + * Retrieves all the versioned flows for the given bucket. + * + * @param bucketId the id of the bucket to retrieve flow for + * @return the set of versioned flows for the given bucket, or an empty set if none exist + */ + Set<VersionedFlow> getFlows(String bucketId); + + /** + * Creates a versioned flow snapshot. + * + * @param flowSnapshot the snapshot to create + * @return the created snapshot + * @throws IllegalStateException if the versioned flow specified by flowSnapshot.getFlowIdentifier() does not exist + */ + VersionedFlowSnapshot createFlowSnapshot(VersionedFlowSnapshot flowSnapshot); + + /** + * Retrieves the snapshot for the given flow identifier and snapshot version. + * + * @param flowIdentifier the identifier of the flow the snapshot belongs to + * @param version the version of the snapshot + * @return the versioned flow snapshot for the given flow identifier and version, or null if none exists + */ + VersionedFlowSnapshot getFlowSnapshot(String flowIdentifier, Integer version); + + /** + * Deletes the snapshot for the given flow identifier and version. + * + * @param flowIdentifier the identifier of the flow the snapshot belongs to + * @param version the version of the snapshot + */ + void deleteFlowSnapshot(String flowIdentifier, Integer version); + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProviderException.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProviderException.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProviderException.java new file mode 100644 index 0000000..bce9352 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProviderException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.metadata; + +/** + * An exception thrown when an error is encountered by a MetadataProvider. + */ +public class MetadataProviderException extends RuntimeException { + + public MetadataProviderException(String message) { + super(message); + } + + public MetadataProviderException(String message, Throwable cause) { + super(message, cause); + } + + public MetadataProviderException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java new file mode 100644 index 0000000..4a4be28 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +/** + * Base interface for providers. + */ +public interface Provider { + + /** + * Called to configure the Provider. + * + * @param configurationContext the context containing configuration for the given provider + * @throws ProviderCreationException if an error occurs while the provider is configured + */ + void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException; + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderConfigurationContext.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderConfigurationContext.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderConfigurationContext.java new file mode 100644 index 0000000..b4f7ed6 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderConfigurationContext.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +import java.util.Map; + +/** + * A context that will passed to providers in order to obtain configuration. + */ +public interface ProviderConfigurationContext { + + /** + * Retrieves all properties the provider currently understands regardless + * of whether a value has been set for them or not. If no value is present + * then its value is null and thus any registered default for the property + * descriptor applies. + * + * @return Map of all properties + */ + Map<String, String> getProperties(); + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderCreationException.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderCreationException.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderCreationException.java new file mode 100644 index 0000000..d1e106c --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderCreationException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +/** + * An exception that will be thrown if a provider can not be created. + */ +public class ProviderCreationException extends RuntimeException { + + public ProviderCreationException() { + } + + public ProviderCreationException(String message) { + super(message); + } + + public ProviderCreationException(String message, Throwable cause) { + super(message, cause); + } + + public ProviderCreationException(Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-impl/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-impl/pom.xml b/nifi-registry-provider-impl/pom.xml new file mode 100644 index 0000000..f5999af --- /dev/null +++ b/nifi-registry-provider-impl/pom.xml @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>nifi-registry-provider-impl</artifactId> + <packaging>jar</packaging> + + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + <resource> + <directory>src/main/xsd</directory> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>jaxb2-maven-plugin</artifactId> + <executions> + <execution> + <id>current</id> + <goals> + <goal>xjc</goal> + </goals> + <configuration> + <packageName>org.apache.nifi.registry.metadata.generated</packageName> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <excludes>**/metadata/generated/*.java,</excludes> + </configuration> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry-provider-api</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>2.7.22</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/flow/FileSystemFlowPersistenceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/flow/FileSystemFlowPersistenceProvider.java b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/flow/FileSystemFlowPersistenceProvider.java new file mode 100644 index 0000000..fc14c14 --- /dev/null +++ b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/flow/FileSystemFlowPersistenceProvider.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.flow; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.registry.provider.ProviderConfigurationContext; +import org.apache.nifi.registry.provider.ProviderCreationException; +import org.apache.nifi.registry.util.FileUtils; +import org.apache.nifi.registry.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +/** + * A FlowPersistenceProvider that uses the local filesystem for storage. + */ +public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvider { + + static final Logger LOGGER = LoggerFactory.getLogger(FileSystemFlowPersistenceProvider.class); + + static final String FLOW_STORAGE_DIR_PROP = "Flow Storage Directory"; + + static final String SNAPSHOT_EXTENSION = ".snapshot"; + + private File flowStorageDir; + + @Override + public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException { + final Map<String,String> props = configurationContext.getProperties(); + if (!props.containsKey(FLOW_STORAGE_DIR_PROP)) { + throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " must be provided"); + } + + final String flowStorageDirValue = props.get(FLOW_STORAGE_DIR_PROP); + if (StringUtils.isBlank(flowStorageDirValue)) { + throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " cannot be null or blank"); + } + + try { + flowStorageDir = new File(flowStorageDirValue); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowStorageDir); + LOGGER.info("Configured FileSystemFlowPersistenceProvider with Flow Storage Directory {}", new Object[] {flowStorageDir.getAbsolutePath()}); + } catch (IOException e) { + throw new ProviderCreationException(e); + } + } + + @Override + public synchronized void saveSnapshot(final FlowSnapshotContext context, final byte[] content) throws FlowPersistenceException { + final File bucketDir = new File(flowStorageDir, context.getBucketId()); + try { + FileUtils.ensureDirectoryExistAndCanReadAndWrite(bucketDir); + } catch (IOException e) { + throw new FlowPersistenceException("Error accessing bucket directory at " + bucketDir.getAbsolutePath(), e); + } + + final File flowDir = new File(bucketDir, context.getFlowId()); + try { + FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowDir); + } catch (IOException e) { + throw new FlowPersistenceException("Error accessing flow directory at " + flowDir.getAbsolutePath(), e); + } + + final String versionString = String.valueOf(context.getVersion()); + final File versionDir = new File(flowDir, versionString); + try { + FileUtils.ensureDirectoryExistAndCanReadAndWrite(versionDir); + } catch (IOException e) { + throw new FlowPersistenceException("Error accessing version directory at " + versionDir.getAbsolutePath(), e); + } + + final File versionFile = new File(versionDir, versionString + SNAPSHOT_EXTENSION); + if (versionFile.exists()) { + throw new FlowPersistenceException("Unable to save, a snapshot already exists with version " + versionString); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Saving snapshot with filename {}", new Object[] {versionFile.getAbsolutePath()}); + } + + try (final OutputStream out = new FileOutputStream(versionFile)) { + out.write(content); + out.flush(); + } catch (Exception e) { + throw new FlowPersistenceException("Unable to write snapshot to disk due to " + e.getMessage(), e); + } + } + + @Override + public synchronized byte[] getSnapshot(final String bucketId, final String flowId, final int version) throws FlowPersistenceException { + final File snapshotFile = getSnapshotFile(bucketId, flowId, version); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Retrieving snapshot with filename {}", new Object[] {snapshotFile.getAbsolutePath()}); + } + + if (!snapshotFile.exists()) { + return null; + } + + try (final InputStream in = new FileInputStream(snapshotFile)){ + return IOUtils.toByteArray(in); + } catch (IOException e) { + throw new FlowPersistenceException("Error reading snapshot file: " + snapshotFile.getAbsolutePath(), e); + } + } + + @Override + public synchronized void deleteSnapshots(final String bucketId, final String flowId) throws FlowPersistenceException { + final File flowDir = new File(flowStorageDir, bucketId + "/" + flowId); + if (!flowDir.exists()) { + LOGGER.debug("Snapshot directory does not exist at {}", new Object[] {flowDir.getAbsolutePath()}); + return; + } + + try { + org.apache.commons.io.FileUtils.cleanDirectory(flowDir); + } catch (IOException e) { + throw new FlowPersistenceException("Error deleting snapshots at " + flowDir.getAbsolutePath(), e); + } + } + + @Override + public synchronized void deleteSnapshot(final String bucketId, final String flowId, final int version) throws FlowPersistenceException { + final File snapshotFile = getSnapshotFile(bucketId, flowId, version); + if (!snapshotFile.exists()) { + LOGGER.debug("Snapshot file does not exist at {}", new Object[] {snapshotFile.getAbsolutePath()}); + return; + } + + final boolean deleted = snapshotFile.delete(); + if (!deleted) { + throw new FlowPersistenceException("Unable to delete snapshot at " + snapshotFile.getAbsolutePath()); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Deleted snapshot at {}", new Object[] {snapshotFile.getAbsolutePath()}); + } + } + + protected File getSnapshotFile(final String bucketId, final String flowId, final int version) { + final String snapshotFilename = bucketId + "/" + flowId + "/" + version + "/" + version + SNAPSHOT_EXTENSION; + return new File(flowStorageDir, snapshotFilename); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java new file mode 100644 index 0000000..ccdc1a8 --- /dev/null +++ b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.metadata; + +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.metadata.generated.Buckets; +import org.apache.nifi.registry.metadata.generated.Flow; +import org.apache.nifi.registry.metadata.generated.Flows; +import org.apache.nifi.registry.metadata.generated.Metadata; +import org.apache.nifi.registry.provider.ProviderConfigurationContext; +import org.apache.nifi.registry.provider.ProviderCreationException; +import org.apache.nifi.registry.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; + +import javax.xml.XMLConstants; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; +import javax.xml.transform.stream.StreamSource; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; +import java.io.File; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A MetadataProvider that persists metadata to the local filesystem. + */ +public class FileSystemMetadataProvider implements MetadataProvider { + + private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemMetadataProvider.class); + + private static final String METADATA_XSD = "/metadata.xsd"; + private static final String JAXB_GENERATED_PATH = "org.apache.nifi.registry.metadata.generated"; + private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext(); + + /** + * Load the JAXBContext. + */ + private static JAXBContext initializeJaxbContext() { + try { + return JAXBContext.newInstance(JAXB_GENERATED_PATH, FileSystemMetadataProvider.class.getClassLoader()); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXBContext."); + } + } + + static final String METADATA_FILE_PROP = "Metadata File"; + + private File metadataFile; + private Schema metadataSchema; + private final AtomicReference<MetadataHolder> metadataHolder = new AtomicReference<>(null); + + public FileSystemMetadataProvider() throws ProviderCreationException { + try { + final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + metadataSchema = schemaFactory.newSchema(FileSystemMetadataProvider.class.getResource(METADATA_XSD)); + } catch (SAXException e) { + throw new ProviderCreationException("Unable to create MetadataProvider due to: " + e.getMessage(), e); + } + } + + @Override + public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException { + final Map<String,String> config = configurationContext.getProperties(); + if (!config.containsKey(METADATA_FILE_PROP)) { + throw new ProviderCreationException("The property " + METADATA_FILE_PROP + " must be provided"); + } + + final String metadataFileValue = config.get(METADATA_FILE_PROP); + if (StringUtils.isBlank(metadataFileValue)) { + throw new ProviderCreationException("The property " + METADATA_FILE_PROP + " cannot be null or blank"); + } + + try { + metadataFile = new File(metadataFileValue); + if (metadataFile.exists()) { + LOGGER.info("Loading metadata file from {}", new Object[] {metadataFile.getAbsolutePath()}); + final Metadata metadata = unmarshallMetadata(); + metadataHolder.set(new MetadataHolder(metadata)); + } else { + LOGGER.info("Creating new metadata file at {}", new Object[] {metadataFile.getAbsolutePath()}); + + final Metadata metadata = new Metadata(); + metadata.setBuckets(new Buckets()); + metadata.setFlows(new Flows()); + + saveMetadata(metadata); + metadataHolder.set(new MetadataHolder(metadata)); + } + } catch (Exception e) { + throw new ProviderCreationException("Unable to configure MetadataProvider due to: " + e.getMessage(), e); + } + } + + private Metadata unmarshallMetadata() throws JAXBException { + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); + unmarshaller.setSchema(metadataSchema); + + final JAXBElement<Metadata> element = unmarshaller.unmarshal(new StreamSource(metadataFile), Metadata.class); + return element.getValue(); + } + + private void saveMetadata(final Metadata metadata) throws JAXBException { + final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); + marshaller.setSchema(metadataSchema); + marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); + marshaller.marshal(metadata, metadataFile); + } + + private synchronized void saveAndRefresh(final Metadata metadata) { + try { + saveMetadata(metadata); + metadataHolder.set(new MetadataHolder(metadata)); + } catch (JAXBException e) { + throw new MetadataProviderException("Unable to save metadata", e); + } + } + + @Override + public synchronized Bucket createBucket(final Bucket bucket) { + if (bucket == null) { + throw new IllegalArgumentException("Bucket cannot be null"); + } + + final org.apache.nifi.registry.metadata.generated.Bucket jaxbBucket = new org.apache.nifi.registry.metadata.generated.Bucket(); + jaxbBucket.setIdentifier(bucket.getIdentifier()); + jaxbBucket.setName(bucket.getName()); + jaxbBucket.setDescription(bucket.getDescription()); + jaxbBucket.setCreatedTimestamp(bucket.getCreatedTimestamp()); + + final MetadataHolder holder = metadataHolder.get(); + + final Metadata metadata = holder.getMetadata(); + metadata.getBuckets().getBucket().add(jaxbBucket); + + saveAndRefresh(metadata); + return metadataHolder.get().getBucketsBydId().get(bucket.getIdentifier()); + } + + @Override + public Bucket getBucket(final String bucketIdentifier) { + if (bucketIdentifier == null) { + throw new IllegalArgumentException("Bucket Identifier cannot be null"); + } + + final MetadataHolder holder = metadataHolder.get(); + return holder.getBucketsBydId().get(bucketIdentifier); + } + + @Override + public Set<Bucket> getBuckets() { + final MetadataHolder holder = metadataHolder.get(); + final Map<String,Bucket> bucketsBydId = holder.getBucketsBydId(); + return new HashSet<>(bucketsBydId.values()); + } + + @Override + public synchronized Bucket updateBucket(final Bucket bucket) { + if (bucket == null) { + throw new IllegalArgumentException("Bucket cannot be null"); + } + + final MetadataHolder holder = metadataHolder.get(); + final Buckets buckets = holder.getMetadata().getBuckets(); + + final org.apache.nifi.registry.metadata.generated.Bucket jaxbBucket = buckets.getBucket().stream() + .filter(b -> bucket.getIdentifier().equals(b.getIdentifier())) + .findFirst() + .orElse(null); + + if (jaxbBucket == null) { + return null; + } + + jaxbBucket.setName(bucket.getName()); + jaxbBucket.setDescription(bucket.getDescription()); + + saveAndRefresh(holder.getMetadata()); + return metadataHolder.get().getBucketsBydId().get(bucket.getIdentifier()); + } + + @Override + public synchronized void deleteBucket(final String bucketIdentifier) { + if (bucketIdentifier == null) { + throw new IllegalArgumentException("Bucket Identifier cannot be null"); + } + + final MetadataHolder holder = metadataHolder.get(); + final Flows flows = holder.getMetadata().getFlows(); + final Buckets buckets = holder.getMetadata().getBuckets(); + + // first remove any flow that reference the bucket + boolean deletedFlow = false; + final Iterator<Flow> flowIterator = flows.getFlow().iterator(); + while (flowIterator.hasNext()) { + final Flow flow = flowIterator.next(); + if (flow.getBucketIdentifier().equals(bucketIdentifier)) { + flowIterator.remove(); + deletedFlow = true; + } + } + + // now delete the actual bucket + boolean deleteBucket = false; + final Iterator<org.apache.nifi.registry.metadata.generated.Bucket> bucketIterator = buckets.getBucket().iterator(); + while (bucketIterator.hasNext()) { + final org.apache.nifi.registry.metadata.generated.Bucket bucket = bucketIterator.next(); + if (bucket.getIdentifier().equals(bucketIdentifier)) { + bucketIterator.remove(); + deleteBucket = true; + break; + } + } + + if (deletedFlow || deleteBucket) { + saveAndRefresh(holder.getMetadata()); + } + } + + @Override + public synchronized VersionedFlow createFlow(final String bucketIdentifier, final VersionedFlow versionedFlow) { + if (bucketIdentifier == null) { + throw new IllegalArgumentException("Bucket Identifier cannot be blank"); + } + + if (versionedFlow == null) { + throw new IllegalArgumentException("Versioned Flow cannot be null"); + } + + final MetadataHolder holder = metadataHolder.get(); + + final Bucket bucket = holder.getBucketsBydId().get(bucketIdentifier); + if (bucket == null) { + throw new IllegalStateException("Unable to create Versioned Flow because Bucket does not exist with id " + bucketIdentifier); + } + + final Flow jaxbFlow = new Flow(); + jaxbFlow.setIdentifier(versionedFlow.getIdentifier()); + jaxbFlow.setName(versionedFlow.getName()); + jaxbFlow.setDescription(versionedFlow.getDescription()); + jaxbFlow.setCreatedTimestamp(versionedFlow.getCreatedTimestamp()); + jaxbFlow.setModifiedTimestamp(versionedFlow.getModifiedTimestamp()); + jaxbFlow.setBucketIdentifier(bucketIdentifier); + + final Metadata metadata = holder.getMetadata(); + metadata.getFlows().getFlow().add(jaxbFlow); + + saveAndRefresh(metadata); + return metadataHolder.get().getFlowsById().get(versionedFlow.getIdentifier()); + } + + @Override + public VersionedFlow getFlow(final String flowIdentifier) { + if (flowIdentifier == null) { + throw new IllegalArgumentException("Flow Identifier cannot be null"); + } + + final MetadataHolder holder = metadataHolder.get(); + return holder.getFlowsById().get(flowIdentifier); + } + + @Override + public Set<VersionedFlow> getFlows() { + final MetadataHolder holder = metadataHolder.get(); + final Map<String,VersionedFlow> flowsById = holder.getFlowsById(); + return new HashSet<>(flowsById.values()); + } + + @Override + public Set<VersionedFlow> getFlows(String bucketId) { + final MetadataHolder holder = metadataHolder.get(); + + final Map<String,Set<VersionedFlow>> flowsByBucket = holder.getFlowsByBucket(); + if (flowsByBucket.containsKey(bucketId)) { + return new HashSet<>(flowsByBucket.get(bucketId)); + } else { + return Collections.emptySet(); + } + } + + @Override + public synchronized VersionedFlow updateFlow(final VersionedFlow versionedFlow) { + if (versionedFlow == null) { + throw new IllegalArgumentException("Versioned Flow cannot be null"); + } + + final MetadataHolder holder = metadataHolder.get(); + final Flows flows = holder.getMetadata().getFlows(); + + final Flow jaxbFlow = flows.getFlow().stream() + .filter(f -> versionedFlow.getIdentifier().equals(f.getIdentifier())) + .findFirst() + .orElse(null); + + if (jaxbFlow == null) { + return null; + } + + // TODO should we allow changing the bucket id here, if so it needs to be passed in + jaxbFlow.setName(versionedFlow.getName()); + jaxbFlow.setDescription(versionedFlow.getDescription()); + jaxbFlow.setModifiedTimestamp(System.currentTimeMillis()); + + saveAndRefresh(holder.getMetadata()); + return metadataHolder.get().getFlowsById().get(versionedFlow.getIdentifier()); + } + + @Override + public synchronized void deleteFlow(final String flowIdentifier) { + if (flowIdentifier == null) { + throw new IllegalArgumentException("Flow Identifier cannot be null"); + } + + final MetadataHolder holder = metadataHolder.get(); + final Flows flows = holder.getMetadata().getFlows(); + + boolean deleted = false; + final Iterator<Flow> flowIter = flows.getFlow().iterator(); + + while (flowIter.hasNext()) { + final Flow jaxbFlow = flowIter.next(); + if (jaxbFlow.getIdentifier().equals(flowIdentifier)) { + flowIter.remove(); + deleted = true; + break; + } + } + + if (deleted) { + saveAndRefresh(holder.getMetadata()); + } + } + + @Override + public synchronized VersionedFlowSnapshot createFlowSnapshot(final VersionedFlowSnapshot flowSnapshot) { + if (flowSnapshot == null) { + throw new IllegalArgumentException("Versioned Flow Snapshot cannot be null"); + } + + final String flowIdentifier = flowSnapshot.getFlowIdentifier(); + final int snapshotVersion = flowSnapshot.getVersion(); + + final MetadataHolder holder = metadataHolder.get(); + final Flows flows = holder.getMetadata().getFlows(); + + final Flow jaxbFlow = flows.getFlow().stream() + .filter(f -> flowIdentifier.equals(f.getIdentifier())) + .findFirst() + .orElse(null); + + if (jaxbFlow == null) { + throw new IllegalStateException("Unable to create snapshot because Versioned Flow does not exist for id " + flowIdentifier); + } + + final Flow.Snapshot jaxbSnapshot = new Flow.Snapshot(); + jaxbSnapshot.setVersion(flowSnapshot.getVersion()); + jaxbSnapshot.setComments(flowSnapshot.getComments()); + jaxbSnapshot.setCreatedTimestamp(flowSnapshot.getTimestamp()); + + jaxbFlow.getSnapshot().add(jaxbSnapshot); + saveAndRefresh(holder.getMetadata()); + + final VersionedFlow versionedFlow = metadataHolder.get().getFlowsById().get(flowIdentifier); + return versionedFlow.getSnapshot(snapshotVersion); + } + + @Override + public VersionedFlowSnapshot getFlowSnapshot(final String flowIdentifier, final Integer version) { + if (flowIdentifier == null) { + throw new IllegalArgumentException("Flow Identifier cannot be null"); + } + + if (version == null) { + throw new IllegalArgumentException("Version cannot be null"); + } + + final MetadataHolder holder = metadataHolder.get(); + + final VersionedFlow versionedFlow = holder.getFlowsById().get(flowIdentifier); + if (versionedFlow == null) { + return null; + } + + return versionedFlow.getSnapshot(version); + } + + @Override + public synchronized void deleteFlowSnapshot(final String flowIdentifier, final Integer version) { + if (flowIdentifier == null) { + throw new IllegalArgumentException("Flow Identifier cannot be null"); + } + + if (version == null) { + throw new IllegalArgumentException("Version cannot be null"); + } + + final MetadataHolder holder = metadataHolder.get(); + final Flows flows = holder.getMetadata().getFlows(); + + final Flow jaxbFlow = flows.getFlow().stream() + .filter(f -> flowIdentifier.equals(f.getIdentifier())) + .findFirst() + .orElse(null); + + if (jaxbFlow == null) { + return; + } + + boolean deletedSnapshot = false; + final Iterator<Flow.Snapshot> snapshotIterator = jaxbFlow.getSnapshot().iterator(); + + while (snapshotIterator.hasNext()) { + final Flow.Snapshot snapshot = snapshotIterator.next(); + if (snapshot.getVersion().equals(version)) { + snapshotIterator.remove(); + deletedSnapshot = true; + break; + } + } + + if (deletedSnapshot) { + saveAndRefresh(holder.getMetadata()); + } + } + +}
