Repository: nifi-registry Updated Branches: refs/heads/master 05cef0775 -> c01f79a29
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/SerializationContainer.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/SerializationContainer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/SerializationContainer.java new file mode 100644 index 0000000..8c4d474 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/SerializationContainer.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.serialization.jackson; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlType; +import java.util.Map; + +@XmlRootElement +@XmlType(propOrder = {"header", "content"}) +public class SerializationContainer<T> { + + private Map<String, String> header; + private T content; + + @ApiModelProperty("The serialization headers") + public Map<String, String> getHeader() { + return header; + } + + public void setHeader(Map<String, String> header) { + this.header = header; + } + + @ApiModelProperty("The serialized content") + public T getContent() { + return content; + } + + public void setContent(T content) { + this.content = content; + } +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java index 515de10..5290fb5 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java @@ -17,19 +17,25 @@ package org.apache.nifi.registry.serialization.jaxb; import org.apache.nifi.registry.serialization.SerializationException; -import org.apache.nifi.registry.serialization.Serializer; +import org.apache.nifi.registry.serialization.VersionedSerializer; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; import javax.xml.bind.Marshaller; import javax.xml.bind.Unmarshaller; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; /** * A Serializer that uses JAXB for serializing/deserializing. */ -public class JAXBSerializer<T> implements Serializer<T> { +public class JAXBSerializer<T> implements VersionedSerializer<T> { + + private static final String MAGIC_HEADER = "Flows"; + private static final byte[] MAGIC_HEADER_BYTES = MAGIC_HEADER.getBytes(StandardCharsets.UTF_8); private final JAXBContext jaxbContext; @@ -45,7 +51,7 @@ public class JAXBSerializer<T> implements Serializer<T> { } @Override - public void serialize(final T t, final OutputStream out) throws SerializationException { + public void serialize(final int dataModelVersion, final T t, final OutputStream out) throws SerializationException { if (t == null) { throw new IllegalArgumentException("The object to serialize cannot be null"); } @@ -54,6 +60,16 @@ public class JAXBSerializer<T> implements Serializer<T> { throw new IllegalArgumentException("OutputStream cannot be null"); } + final ByteBuffer byteBuffer = ByteBuffer.allocate(9); + byteBuffer.put(MAGIC_HEADER_BYTES); + byteBuffer.putInt(dataModelVersion); + + try { + out.write(byteBuffer.array()); + } catch (final IOException e) { + throw new SerializationException("Unable to write header while serializing process group", e); + } + try { final Marshaller marshaller = jaxbContext.createMarshaller(); marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); @@ -70,6 +86,8 @@ public class JAXBSerializer<T> implements Serializer<T> { } try { + // Consume the header bytes. + readDataModelVersion(input); final Unmarshaller unmarshaller = jaxbContext.createUnmarshaller(); return (T) unmarshaller.unmarshal(input); } catch (JAXBException e) { @@ -77,4 +95,33 @@ public class JAXBSerializer<T> implements Serializer<T> { } } + @Override + public int readDataModelVersion(InputStream input) throws SerializationException { + final int headerLength = 9; + final byte[] buffer = new byte[headerLength]; + + int bytesRead = -1; + try { + bytesRead = input.read(buffer, 0, headerLength); + } catch (final IOException e) { + throw new SerializationException("Unable to read header while deserializing process group", e); + } + + if (bytesRead < headerLength) { + throw new SerializationException("Unable to read header while deserializing process group, expected" + + headerLength + " bytes, but found " + bytesRead); + } + + final ByteBuffer bb = ByteBuffer.wrap(buffer); + final byte[] magicHeaderBytes = new byte[MAGIC_HEADER_BYTES.length]; + bb.get(magicHeaderBytes); + for (int i = 0; i < MAGIC_HEADER_BYTES.length; i++) { + if (MAGIC_HEADER_BYTES[i] != magicHeaderBytes[i]) { + throw new SerializationException("Unable to read header while deserializing process group." + + " Header byte sequence does not match"); + } + } + + return bb.getInt(MAGIC_HEADER_BYTES.length); + } } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider b/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider index 6d4fdfc..e456fa2 100644 --- a/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider +++ b/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider @@ -12,4 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider \ No newline at end of file +org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider +org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java new file mode 100644 index 0000000..45351ab --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow.git; + +import org.apache.nifi.registry.flow.FlowPersistenceException; +import org.apache.nifi.registry.provider.ProviderConfigurationContext; +import org.apache.nifi.registry.provider.ProviderCreationException; +import org.apache.nifi.registry.provider.StandardProviderConfigurationContext; +import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext; +import org.apache.nifi.registry.util.FileUtils; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.lib.StoredConfig; +import org.eclipse.jgit.revwalk.RevCommit; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class TestGitFlowPersistenceProvider { + + private static final Logger logger = LoggerFactory.getLogger(TestGitFlowPersistenceProvider.class); + + private void assertCreationFailure(final Map<String, String> properties, final Consumer<ProviderCreationException> assertion) { + final GitFlowPersistenceProvider persistenceProvider = new GitFlowPersistenceProvider(); + + try { + final ProviderConfigurationContext configurationContext = new StandardProviderConfigurationContext(properties); + persistenceProvider.onConfigured(configurationContext); + fail("Should fail"); + } catch (ProviderCreationException e) { + assertion.accept(e); + } + } + + @Test + public void testNoFlowStorageDirSpecified() { + final Map<String, String> properties = new HashMap<>(); + assertCreationFailure(properties, + e -> assertEquals("The property Flow Storage Directory must be provided", e.getMessage())); + } + + @Test + public void testLoadNonExistingDir() { + final Map<String, String> properties = new HashMap<>(); + properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/non-existing"); + assertCreationFailure(properties, + e -> assertEquals("'target/non-existing' is not a directory or does not exist.", e.getCause().getMessage())); + } + + @Test + public void testLoadNonGitDir() { + final Map<String, String> properties = new HashMap<>(); + properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target"); + assertCreationFailure(properties, + e -> assertEquals("Directory 'target' does not contain a .git directory." + + " Please init and configure the directory with 'git init' command before using it from NiFi Registry.", + e.getCause().getMessage())); + } + + @FunctionalInterface + private interface GitConsumer { + void accept(Git git) throws GitAPIException; + } + + private void assertProvider(final Map<String, String> properties, final GitConsumer gitConsumer, final Consumer<GitFlowPersistenceProvider> assertion, boolean deleteDir) + throws IOException, GitAPIException { + + final File gitDir = new File(properties.get(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP)); + try { + FileUtils.ensureDirectoryExistAndCanReadAndWrite(gitDir); + + try (final Git git = Git.init().setDirectory(gitDir).call()) { + logger.debug("Initiated a git repository {}", git); + final StoredConfig config = git.getRepository().getConfig(); + config.setString("user", null, "name", "git-user"); + config.setString("user", null, "email", "[email protected]"); + config.save(); + gitConsumer.accept(git); + } + + final GitFlowPersistenceProvider persistenceProvider = new GitFlowPersistenceProvider(); + + final ProviderConfigurationContext configurationContext = new StandardProviderConfigurationContext(properties); + persistenceProvider.onConfigured(configurationContext); + assertion.accept(persistenceProvider); + + } finally { + if (deleteDir) { + FileUtils.deleteFile(gitDir, true); + } + } + } + + @Test + public void testLoadEmptyGitDir() throws GitAPIException, IOException { + final Map<String, String> properties = new HashMap<>(); + properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/empty-git"); + + assertProvider(properties, g -> {}, p -> { + try { + p.getFlowContent("bucket-id-A", "flow-id-1", 1); + } catch (FlowPersistenceException e) { + assertEquals("Bucket ID bucket-id-A was not found.", e.getMessage()); + } + }, true); + } + + @Test + public void testLoadCommitHistories() throws GitAPIException, IOException { + final Map<String, String> properties = new HashMap<>(); + properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/repo-with-histories"); + + assertProvider(properties, g -> {}, p -> { + // Create some Flows and keep the directory. + final StandardFlowSnapshotContext.Builder contextBuilder = new StandardFlowSnapshotContext.Builder() + .bucketId("bucket-id-A") + .bucketName("C'est/Bucket A/ã§ãã") + .flowId("flow-id-1") + .flowName("ãã¹ã_ç¨/ããã¼#1\\[contains invalid chars]") + .author("unit-test-user") + .comments("Initial commit.") + .snapshotTimestamp(new Date().getTime()) + .version(1); + + final byte[] flow1Ver1 = "Flow1 ver.1".getBytes(StandardCharsets.UTF_8); + p.saveFlowContent(contextBuilder.build(), flow1Ver1); + + contextBuilder.comments("2nd commit.").version(2); + final byte[] flow1Ver2 = "Flow1 ver.2".getBytes(StandardCharsets.UTF_8); + p.saveFlowContent(contextBuilder.build(), flow1Ver2); + + // Rename flow. + contextBuilder.flowName("FlowOne").comments("3rd commit.").version(3); + final byte[] flow1Ver3 = "FlowOne ver.3".getBytes(StandardCharsets.UTF_8); + p.saveFlowContent(contextBuilder.build(), flow1Ver3); + + // Adding another flow. + contextBuilder.flowId("flow-id-2").flowName("FlowTwo").comments("4th commit.").version(1); + final byte[] flow2Ver1 = "FlowTwo ver.1".getBytes(StandardCharsets.UTF_8); + p.saveFlowContent(contextBuilder.build(), flow2Ver1); + + // Rename bucket. + contextBuilder.bucketName("New name for Bucket A").comments("5th commit.").version(2); + final byte[] flow2Ver2 = "FlowTwo ver.2".getBytes(StandardCharsets.UTF_8); + p.saveFlowContent(contextBuilder.build(), flow2Ver2); + + + }, false); + + assertProvider(properties, g -> { + // Assert commit. + final AtomicInteger commitCount = new AtomicInteger(0); + final String[] commitMessages = { + "5th commit.\n\nBy NiFi Registry user: unit-test-user", + "4th commit.\n\nBy NiFi Registry user: unit-test-user", + "3rd commit.\n\nBy NiFi Registry user: unit-test-user", + "2nd commit.\n\nBy NiFi Registry user: unit-test-user", + "Initial commit.\n\nBy NiFi Registry user: unit-test-user" + }; + for (RevCommit commit : g.log().call()) { + assertEquals("git-user", commit.getAuthorIdent().getName()); + final int commitIndex = commitCount.getAndIncrement(); + assertEquals(commitMessages[commitIndex], commit.getFullMessage()); + } + assertEquals(commitMessages.length, commitCount.get()); + }, p -> { + // Should be able to load flow from commit histories. + final byte[] flow1Ver1 = p.getFlowContent("bucket-id-A", "flow-id-1", 1); + assertEquals("Flow1 ver.1", new String(flow1Ver1, StandardCharsets.UTF_8)); + + final byte[] flow1Ver2 = p.getFlowContent("bucket-id-A", "flow-id-1", 2); + assertEquals("Flow1 ver.2", new String(flow1Ver2, StandardCharsets.UTF_8)); + + // Even if the name of flow has been changed, it can be retrieved by the same flow id. + final byte[] flow1Ver3 = p.getFlowContent("bucket-id-A", "flow-id-1", 3); + assertEquals("FlowOne ver.3", new String(flow1Ver3, StandardCharsets.UTF_8)); + + final byte[] flow2Ver1 = p.getFlowContent("bucket-id-A", "flow-id-2", 1); + assertEquals("FlowTwo ver.1", new String(flow2Ver1, StandardCharsets.UTF_8)); + + // Even if the name of bucket has been changed, it can be retrieved by the same flow id. + final byte[] flow2Ver2 = p.getFlowContent("bucket-id-A", "flow-id-2", 2); + assertEquals("FlowTwo ver.2", new String(flow2Ver2, StandardCharsets.UTF_8)); + + // Delete the 2nd flow. + p.deleteAllFlowContent("bucket-id-A", "flow-id-2"); + + }, false); + + assertProvider(properties, g -> { + // Assert commit. + final AtomicInteger commitCount = new AtomicInteger(0); + final String[] commitMessages = { + "Deleted flow FlowTwo.snapshot:flow-id-2 in bucket New_name_for_Bucket_A:bucket-id-A.", + "5th commit.", + "4th commit.", + "3rd commit.", + "2nd commit.", + "Initial commit." + }; + for (RevCommit commit : g.log().call()) { + assertEquals("git-user", commit.getAuthorIdent().getName()); + final int commitIndex = commitCount.getAndIncrement(); + assertEquals(commitMessages[commitIndex], commit.getShortMessage()); + } + assertEquals(commitMessages.length, commitCount.get()); + }, p -> { + // Should be able to load flow from commit histories. + final byte[] flow1Ver1 = p.getFlowContent("bucket-id-A", "flow-id-1", 1); + assertEquals("Flow1 ver.1", new String(flow1Ver1, StandardCharsets.UTF_8)); + + final byte[] flow1Ver2 = p.getFlowContent("bucket-id-A", "flow-id-1", 2); + assertEquals("Flow1 ver.2", new String(flow1Ver2, StandardCharsets.UTF_8)); + + // Even if the name of flow has been changed, it can be retrieved by the same flow id. + final byte[] flow1Ver3 = p.getFlowContent("bucket-id-A", "flow-id-1", 3); + assertEquals("FlowOne ver.3", new String(flow1Ver3, StandardCharsets.UTF_8)); + + // The 2nd flow has been deleted, and should not exist. + try { + p.getFlowContent("bucket-id-A", "flow-id-2", 1); + } catch (FlowPersistenceException e) { + assertEquals("Flow ID flow-id-2 was not found in bucket New_name_for_Bucket_A:bucket-id-A.", e.getMessage()); + } + + try { + p.getFlowContent("bucket-id-A", "flow-id-2", 2); + } catch (FlowPersistenceException e) { + assertEquals("Flow ID flow-id-2 was not found in bucket New_name_for_Bucket_A:bucket-id-A.", e.getMessage()); + } + + // Delete the 1st flow, too. + p.deleteAllFlowContent("bucket-id-A", "flow-id-1"); + + }, false); + + assertProvider(properties, g -> { + // Assert commit. + final AtomicInteger commitCount = new AtomicInteger(0); + final String[] commitMessages = { + "Deleted flow FlowOne.snapshot:flow-id-1 in bucket New_name_for_Bucket_A:bucket-id-A.", + "Deleted flow FlowTwo.snapshot:flow-id-2 in bucket New_name_for_Bucket_A:bucket-id-A.", + "5th commit.", + "4th commit.", + "3rd commit.", + "2nd commit.", + "Initial commit." + }; + for (RevCommit commit : g.log().call()) { + assertEquals("git-user", commit.getAuthorIdent().getName()); + final int commitIndex = commitCount.getAndIncrement(); + assertEquals(commitMessages[commitIndex], commit.getShortMessage()); + } + assertEquals(commitMessages.length, commitCount.get()); + }, p -> { + // The 1st flow has been deleted, and should not exist. Moreover, the bucket A has been deleted since there's no flow. + try { + p.getFlowContent("bucket-id-A", "flow-id-1", 1); + } catch (FlowPersistenceException e) { + assertEquals("Bucket ID bucket-id-A was not found.", e.getMessage()); + } + }, true); + } +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java index 75c76b7..584e2f7 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java @@ -23,6 +23,11 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class TestVersionedProcessGroupSerializer { @@ -57,4 +62,69 @@ public class TestVersionedProcessGroupSerializer { Assert.assertEquals(processor1.getName(), deserializedProcessor1.getName()); } + + @Test + public void testDeserializeJsonNonIntegerVersion() throws IOException { + final String file = "/serialization/json/non-integer-version.snapshot"; + final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer(); + try (final InputStream is = this.getClass().getResourceAsStream(file)) { + try { + serializer.deserialize(is); + fail("Should fail"); + } catch (SerializationException e) { + assertEquals("Unable to find a process group serializer compatible with the input.", e.getMessage()); + } + } + } + + @Test + public void testDeserializeJsonNoVersion() throws IOException { + final String file = "/serialization/json/no-version.snapshot"; + final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer(); + try (final InputStream is = this.getClass().getResourceAsStream(file)) { + try { + serializer.deserialize(is); + fail("Should fail"); + } catch (SerializationException e) { + assertEquals("Unable to find a process group serializer compatible with the input.", e.getMessage()); + } + } + } + + @Test + public void testDeserializeVer1() throws IOException { + final String file = "/serialization/ver1.snapshot"; + final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer(); + final VersionedProcessGroup processGroup; + try (final InputStream is = this.getClass().getResourceAsStream(file)) { + processGroup = serializer.deserialize(is); + } + System.out.printf("processGroup=" + processGroup); + } + + @Test + public void testDeserializeVer2() throws IOException { + final String file = "/serialization/ver2.snapshot"; + final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer(); + final VersionedProcessGroup processGroup; + try (final InputStream is = this.getClass().getResourceAsStream(file)) { + processGroup = serializer.deserialize(is); + } + System.out.printf("processGroup=" + processGroup); + } + + @Test + public void testDeserializeVer3() throws IOException { + final String file = "/serialization/ver3.snapshot"; + final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer(); + try (final InputStream is = this.getClass().getResourceAsStream(file)) { + try { + serializer.deserialize(is); + fail("Should fail"); + } catch (SerializationException e) { + assertEquals("Unable to find a process group serializer compatible with the input.", e.getMessage()); + } + } + } + } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java index 1ea9e7c..916e053 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java @@ -19,7 +19,7 @@ package org.apache.nifi.registry.serialization.jaxb; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.VersionedProcessor; import org.apache.nifi.registry.serialization.SerializationException; -import org.apache.nifi.registry.serialization.Serializer; +import org.apache.nifi.registry.serialization.VersionedSerializer; import org.junit.Assert; import org.junit.Test; @@ -31,7 +31,7 @@ public class TestJAXBVersionedProcessGroupSerializer { @Test public void testSerializeDeserializeFlowSnapshot() throws SerializationException { - final Serializer<VersionedProcessGroup> serializer = new JAXBVersionedProcessGroupSerializer(); + final VersionedSerializer<VersionedProcessGroup> serializer = new JAXBVersionedProcessGroupSerializer(); final VersionedProcessGroup processGroup1 = new VersionedProcessGroup(); processGroup1.setIdentifier("pg1"); @@ -45,12 +45,18 @@ public class TestJAXBVersionedProcessGroupSerializer { processGroup1.getProcessors().add(processor1); final ByteArrayOutputStream out = new ByteArrayOutputStream(); - serializer.serialize(processGroup1, out); + serializer.serialize(1, processGroup1, out); final String snapshotStr = new String(out.toByteArray(), StandardCharsets.UTF_8); //System.out.println(snapshotStr); final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); + in.mark(1024); + final int version = serializer.readDataModelVersion(in); + + Assert.assertEquals(1, version); + + in.reset(); final VersionedProcessGroup deserializedProcessGroup1 = serializer.deserialize(in); Assert.assertEquals(processGroup1.getIdentifier(), deserializedProcessGroup1.getIdentifier()); http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/resources/serialization/json/no-version.snapshot ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/resources/serialization/json/no-version.snapshot b/nifi-registry-framework/src/test/resources/serialization/json/no-version.snapshot new file mode 100644 index 0000000..ce1901f --- /dev/null +++ b/nifi-registry-framework/src/test/resources/serialization/json/no-version.snapshot @@ -0,0 +1,5 @@ +{ + "header": { + }, + "content": {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/resources/serialization/json/non-integer-version.snapshot ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/resources/serialization/json/non-integer-version.snapshot b/nifi-registry-framework/src/test/resources/serialization/json/non-integer-version.snapshot new file mode 100644 index 0000000..33d4da3 --- /dev/null +++ b/nifi-registry-framework/src/test/resources/serialization/json/non-integer-version.snapshot @@ -0,0 +1,6 @@ +{ + "header": { + "dataModelVersion": "One" + }, + "content": {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/resources/serialization/ver1.snapshot ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/resources/serialization/ver1.snapshot b/nifi-registry-framework/src/test/resources/serialization/ver1.snapshot new file mode 100644 index 0000000..7c1ab49 Binary files /dev/null and b/nifi-registry-framework/src/test/resources/serialization/ver1.snapshot differ http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/resources/serialization/ver2.snapshot ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/resources/serialization/ver2.snapshot b/nifi-registry-framework/src/test/resources/serialization/ver2.snapshot new file mode 100644 index 0000000..7f4dfc5 --- /dev/null +++ b/nifi-registry-framework/src/test/resources/serialization/ver2.snapshot @@ -0,0 +1,97 @@ +{ + "header": { + "dataModelVersion": 2 + }, + "content": { + "identifier": "a2c80883-171c-316d-ba25-24df2c352693", + "name": "Flow1", + "comments": "", + "position": { + "x": 1549.249149182042, + "y": 764.2426186568309 + }, + "processGroups": [], + "remoteProcessGroups": [], + "processors": [ + { + "identifier": "92fe4513-21c0-34f6-a916-2874f46ae864", + "name": "GenerateFlowFile", + "comments": "", + "position": { + "x": 488.99999411591034, + "y": 114.00000359389122 + }, + "bundle": { + "group": "org.apache.nifi", + "artifact": "nifi-standard-nar", + "version": "1.6.0-SNAPSHOT" + }, + "style": {}, + "type": "org.apache.nifi.processors.standard.GenerateFlowFile", + "properties": { + "character-set": "UTF-8", + "File Size": "0B", + "Batch Size": "1", + "Unique FlowFiles": "false", + "Data Format": "Text" + }, + "propertyDescriptors": { + "character-set": { + "name": "character-set", + "displayName": "Character Set", + "identifiesControllerService": false, + "sensitive": false + }, + "File Size": { + "name": "File Size", + "displayName": "File Size", + "identifiesControllerService": false, + "sensitive": false + }, + "generate-ff-custom-text": { + "name": "generate-ff-custom-text", + "displayName": "Custom Text", + "identifiesControllerService": false, + "sensitive": false + }, + "Batch Size": { + "name": "Batch Size", + "displayName": "Batch Size", + "identifiesControllerService": false, + "sensitive": false + }, + "Unique FlowFiles": { + "name": "Unique FlowFiles", + "displayName": "Unique FlowFiles", + "identifiesControllerService": false, + "sensitive": false + }, + "Data Format": { + "name": "Data Format", + "displayName": "Data Format", + "identifiesControllerService": false, + "sensitive": false + } + }, + "schedulingPeriod": "0 sec", + "schedulingStrategy": "TIMER_DRIVEN", + "executionNode": "ALL", + "penaltyDuration": "30 sec", + "yieldDuration": "1 sec", + "bulletinLevel": "WARN", + "runDurationMillis": 0, + "concurrentlySchedulableTaskCount": 1, + "componentType": "PROCESSOR", + "groupIdentifier": "a2c80883-171c-316d-ba25-24df2c352693" + } + ], + "inputPorts": [], + "outputPorts": [], + "connections": [], + "labels": [], + "funnels": [], + "controllerServices": [], + "variables": {}, + "componentType": "PROCESS_GROUP" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot b/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot new file mode 100644 index 0000000..574fe56 --- /dev/null +++ b/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot @@ -0,0 +1,6 @@ +{ + "header": { + "dataModelVersion": 3 + }, + "content": {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java index c5e06f5..c4bdd46 100644 --- a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java @@ -56,4 +56,9 @@ public interface FlowSnapshotContext { */ long getSnapshotTimestamp(); + /** + * @return the name of the user who created the snapshot + */ + String getAuthor(); + } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-resources/src/main/resources/conf/providers.xml ---------------------------------------------------------------------- diff --git a/nifi-registry-resources/src/main/resources/conf/providers.xml b/nifi-registry-resources/src/main/resources/conf/providers.xml index 40bf012..720bee2 100644 --- a/nifi-registry-resources/src/main/resources/conf/providers.xml +++ b/nifi-registry-resources/src/main/resources/conf/providers.xml @@ -20,4 +20,13 @@ <property name="Flow Storage Directory">./flow_storage</property> </flowPersistenceProvider> + <!-- + <flowPersistenceProvider> + <class>org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider</class> + <property name="Flow Storage Directory">./flow_storage</property> + <property name="Remote To Push"></property> + <property name="Remote Access User"></property> + <property name="Remote Access Password"></property> + </flowPersistenceProvider> + --> </providers> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java b/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java index b7476b9..5abaf7e 100644 --- a/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java +++ b/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java @@ -389,4 +389,38 @@ public class FileUtils { /* do nothing */ } } + + + // The invalid character list is derived from this Stackoverflow page. + // https://stackoverflow.com/questions/1155107/is-there-a-cross-platform-java-method-to-remove-filename-special-chars + private final static int[] INVALID_CHARS = {34, 60, 62, 124, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, + 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 58, 42, 63, 92, 47, 32}; + + static { + Arrays.sort(INVALID_CHARS); + } + + /** + * Replaces invalid characters for a file system name within a given filename string to underscore '_'. + * Be careful not to pass a file path as this method replaces path delimiter characters (i.e forward/back slashes). + * @param filename The filename to clean + * @return sanitized filename + */ + public static String sanitizeFilename(String filename) { + if (filename == null || filename.isEmpty()) { + return filename; + } + int codePointCount = filename.codePointCount(0, filename.length()); + + final StringBuilder cleanName = new StringBuilder(); + for (int i = 0; i < codePointCount; i++) { + int c = filename.codePointAt(i); + if (Arrays.binarySearch(INVALID_CHARS, c) < 0) { + cleanName.appendCodePoint(c); + } else { + cleanName.append('_'); + } + } + return cleanName.toString(); + } } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-utils/src/test/java/org/apache/nifi/registry/util/TestFileUtils.java ---------------------------------------------------------------------- diff --git a/nifi-registry-utils/src/test/java/org/apache/nifi/registry/util/TestFileUtils.java b/nifi-registry-utils/src/test/java/org/apache/nifi/registry/util/TestFileUtils.java new file mode 100644 index 0000000..d4bc963 --- /dev/null +++ b/nifi-registry-utils/src/test/java/org/apache/nifi/registry/util/TestFileUtils.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.util; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestFileUtils { + @Test + public void testSanitizeFilename() { + String filename = "This / is / a test"; + final String sanitizedFilename = FileUtils.sanitizeFilename(filename); + assertEquals("This___is___a_test", sanitizedFilename); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java index a2174ba..10a044a 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java @@ -16,10 +16,7 @@ */ package org.apache.nifi.registry.web.mapper; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; +import org.apache.nifi.registry.serialization.jackson.ObjectMapperProvider; import org.glassfish.jersey.jackson.internal.jackson.jaxrs.json.JacksonJaxbJsonProvider; import org.springframework.stereotype.Component; @@ -32,17 +29,8 @@ import javax.ws.rs.ext.Provider; @Produces(MediaType.APPLICATION_JSON) public class NiFiRegistryJsonProvider extends JacksonJaxbJsonProvider { - private static final ObjectMapper mapper = new ObjectMapper(); - - static { - mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); - mapper.setPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL)); - mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector(mapper.getTypeFactory())); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - public NiFiRegistryJsonProvider() { super(); - setMapper(mapper); + setMapper(ObjectMapperProvider.getMapper()); } }
