Repository: nifi-registry
Updated Branches:
  refs/heads/master 05cef0775 -> c01f79a29


http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/SerializationContainer.java
----------------------------------------------------------------------
diff --git 
a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/SerializationContainer.java
 
b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/SerializationContainer.java
new file mode 100644
index 0000000..8c4d474
--- /dev/null
+++ 
b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/SerializationContainer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.serialization.jackson;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlType;
+import java.util.Map;
+
+@XmlRootElement
+@XmlType(propOrder = {"header", "content"})
+public class SerializationContainer<T> {
+
+    private Map<String, String> header;
+    private T content;
+
+    @ApiModelProperty("The serialization headers")
+    public Map<String, String> getHeader() {
+        return header;
+    }
+
+    public void setHeader(Map<String, String> header) {
+        this.header = header;
+    }
+
+    @ApiModelProperty("The serialized content")
+    public T getContent() {
+        return content;
+    }
+
+    public void setContent(T content) {
+        this.content = content;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
----------------------------------------------------------------------
diff --git 
a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
 
b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
index 515de10..5290fb5 100644
--- 
a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
+++ 
b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
@@ -17,19 +17,25 @@
 package org.apache.nifi.registry.serialization.jaxb;
 
 import org.apache.nifi.registry.serialization.SerializationException;
-import org.apache.nifi.registry.serialization.Serializer;
+import org.apache.nifi.registry.serialization.VersionedSerializer;
 
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 import javax.xml.bind.Unmarshaller;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 
 /**
  * A Serializer that uses JAXB for serializing/deserializing.
  */
-public class JAXBSerializer<T> implements Serializer<T> {
+public class JAXBSerializer<T> implements VersionedSerializer<T> {
+
+    private static final String MAGIC_HEADER = "Flows";
+    private static final byte[] MAGIC_HEADER_BYTES = 
MAGIC_HEADER.getBytes(StandardCharsets.UTF_8);
 
     private final JAXBContext jaxbContext;
 
@@ -45,7 +51,7 @@ public class JAXBSerializer<T> implements Serializer<T> {
     }
 
     @Override
-    public void serialize(final T t, final OutputStream out) throws 
SerializationException {
+    public void serialize(final int dataModelVersion, final T t, final 
OutputStream out) throws SerializationException {
         if (t == null) {
             throw new IllegalArgumentException("The object to serialize cannot 
be null");
         }
@@ -54,6 +60,16 @@ public class JAXBSerializer<T> implements Serializer<T> {
             throw new IllegalArgumentException("OutputStream cannot be null");
         }
 
+        final ByteBuffer byteBuffer = ByteBuffer.allocate(9);
+        byteBuffer.put(MAGIC_HEADER_BYTES);
+        byteBuffer.putInt(dataModelVersion);
+
+        try {
+            out.write(byteBuffer.array());
+        } catch (final IOException e) {
+            throw new SerializationException("Unable to write header while 
serializing process group", e);
+        }
+
         try {
             final Marshaller marshaller = jaxbContext.createMarshaller();
             marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
@@ -70,6 +86,8 @@ public class JAXBSerializer<T> implements Serializer<T> {
         }
 
         try {
+            // Consume the header bytes.
+            readDataModelVersion(input);
             final Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
             return (T) unmarshaller.unmarshal(input);
         } catch (JAXBException e) {
@@ -77,4 +95,33 @@ public class JAXBSerializer<T> implements Serializer<T> {
         }
     }
 
+    @Override
+    public int readDataModelVersion(InputStream input) throws 
SerializationException {
+        final int headerLength = 9;
+        final byte[] buffer = new byte[headerLength];
+
+        int bytesRead = -1;
+        try {
+            bytesRead = input.read(buffer, 0, headerLength);
+        } catch (final IOException e) {
+            throw new SerializationException("Unable to read header while 
deserializing process group", e);
+        }
+
+        if (bytesRead < headerLength) {
+            throw new SerializationException("Unable to read header while 
deserializing process group, expected"
+                    + headerLength + " bytes, but found " + bytesRead);
+        }
+
+        final ByteBuffer bb = ByteBuffer.wrap(buffer);
+        final byte[] magicHeaderBytes = new byte[MAGIC_HEADER_BYTES.length];
+        bb.get(magicHeaderBytes);
+        for (int i = 0; i < MAGIC_HEADER_BYTES.length; i++) {
+            if (MAGIC_HEADER_BYTES[i] != magicHeaderBytes[i]) {
+                throw new SerializationException("Unable to read header while 
deserializing process group." +
+                        " Header byte sequence does not match");
+            }
+        }
+
+        return bb.getInt(MAGIC_HEADER_BYTES.length);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
----------------------------------------------------------------------
diff --git 
a/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
 
b/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
index 6d4fdfc..e456fa2 100644
--- 
a/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
+++ 
b/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
@@ -12,4 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider
\ No newline at end of file
+org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider
+org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java
 
b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java
new file mode 100644
index 0000000..45351ab
--- /dev/null
+++ 
b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow.git;
+
+import org.apache.nifi.registry.flow.FlowPersistenceException;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.provider.StandardProviderConfigurationContext;
+import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext;
+import org.apache.nifi.registry.util.FileUtils;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.lib.StoredConfig;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestGitFlowPersistenceProvider {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TestGitFlowPersistenceProvider.class);
+
+    private void assertCreationFailure(final Map<String, String> properties, 
final Consumer<ProviderCreationException> assertion) {
+        final GitFlowPersistenceProvider persistenceProvider = new 
GitFlowPersistenceProvider();
+
+        try {
+            final ProviderConfigurationContext configurationContext = new 
StandardProviderConfigurationContext(properties);
+            persistenceProvider.onConfigured(configurationContext);
+            fail("Should fail");
+        } catch (ProviderCreationException e) {
+            assertion.accept(e);
+        }
+    }
+
+    @Test
+    public void testNoFlowStorageDirSpecified() {
+        final Map<String, String> properties = new HashMap<>();
+        assertCreationFailure(properties,
+                e -> assertEquals("The property Flow Storage Directory must be 
provided", e.getMessage()));
+    }
+
+    @Test
+    public void testLoadNonExistingDir() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, 
"target/non-existing");
+        assertCreationFailure(properties,
+                e -> assertEquals("'target/non-existing' is not a directory or 
does not exist.", e.getCause().getMessage()));
+    }
+
+    @Test
+    public void testLoadNonGitDir() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, 
"target");
+        assertCreationFailure(properties,
+                e -> assertEquals("Directory 'target' does not contain a .git 
directory." +
+                                " Please init and configure the directory with 
'git init' command before using it from NiFi Registry.",
+                        e.getCause().getMessage()));
+    }
+
+    @FunctionalInterface
+    private interface GitConsumer {
+        void accept(Git git) throws GitAPIException;
+    }
+
+    private void assertProvider(final Map<String, String> properties, final 
GitConsumer gitConsumer, final Consumer<GitFlowPersistenceProvider> assertion, 
boolean deleteDir)
+            throws IOException, GitAPIException {
+
+        final File gitDir = new 
File(properties.get(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP));
+        try {
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(gitDir);
+
+            try (final Git git = Git.init().setDirectory(gitDir).call()) {
+                logger.debug("Initiated a git repository {}", git);
+                final StoredConfig config = git.getRepository().getConfig();
+                config.setString("user", null, "name", "git-user");
+                config.setString("user", null, "email", 
"[email protected]");
+                config.save();
+                gitConsumer.accept(git);
+            }
+
+            final GitFlowPersistenceProvider persistenceProvider = new 
GitFlowPersistenceProvider();
+
+            final ProviderConfigurationContext configurationContext = new 
StandardProviderConfigurationContext(properties);
+            persistenceProvider.onConfigured(configurationContext);
+            assertion.accept(persistenceProvider);
+
+        } finally {
+            if (deleteDir) {
+                FileUtils.deleteFile(gitDir, true);
+            }
+        }
+    }
+
+    @Test
+    public void testLoadEmptyGitDir() throws GitAPIException, IOException {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, 
"target/empty-git");
+
+        assertProvider(properties, g -> {}, p -> {
+            try {
+                p.getFlowContent("bucket-id-A", "flow-id-1", 1);
+            } catch (FlowPersistenceException e) {
+                assertEquals("Bucket ID bucket-id-A was not found.", 
e.getMessage());
+            }
+        }, true);
+    }
+
+    @Test
+    public void testLoadCommitHistories() throws GitAPIException, IOException {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, 
"target/repo-with-histories");
+
+        assertProvider(properties, g -> {}, p -> {
+            // Create some Flows and keep the directory.
+            final StandardFlowSnapshotContext.Builder contextBuilder = new 
StandardFlowSnapshotContext.Builder()
+                    .bucketId("bucket-id-A")
+                    .bucketName("C'est/Bucket A/です。")
+                    .flowId("flow-id-1")
+                    .flowName("テスト_用/フロー#1\\[contains invalid 
chars]")
+                    .author("unit-test-user")
+                    .comments("Initial commit.")
+                    .snapshotTimestamp(new Date().getTime())
+                    .version(1);
+
+            final byte[] flow1Ver1 = "Flow1 
ver.1".getBytes(StandardCharsets.UTF_8);
+            p.saveFlowContent(contextBuilder.build(), flow1Ver1);
+
+            contextBuilder.comments("2nd commit.").version(2);
+            final byte[] flow1Ver2 = "Flow1 
ver.2".getBytes(StandardCharsets.UTF_8);
+            p.saveFlowContent(contextBuilder.build(), flow1Ver2);
+
+            // Rename flow.
+            contextBuilder.flowName("FlowOne").comments("3rd 
commit.").version(3);
+            final byte[] flow1Ver3 = "FlowOne 
ver.3".getBytes(StandardCharsets.UTF_8);
+            p.saveFlowContent(contextBuilder.build(), flow1Ver3);
+
+            // Adding another flow.
+            
contextBuilder.flowId("flow-id-2").flowName("FlowTwo").comments("4th 
commit.").version(1);
+            final byte[] flow2Ver1 = "FlowTwo 
ver.1".getBytes(StandardCharsets.UTF_8);
+            p.saveFlowContent(contextBuilder.build(), flow2Ver1);
+
+            // Rename bucket.
+            contextBuilder.bucketName("New name for Bucket A").comments("5th 
commit.").version(2);
+            final byte[] flow2Ver2 = "FlowTwo 
ver.2".getBytes(StandardCharsets.UTF_8);
+            p.saveFlowContent(contextBuilder.build(), flow2Ver2);
+
+
+        }, false);
+
+        assertProvider(properties, g -> {
+            // Assert commit.
+            final AtomicInteger commitCount = new AtomicInteger(0);
+            final String[] commitMessages = {
+                    "5th commit.\n\nBy NiFi Registry user: unit-test-user",
+                    "4th commit.\n\nBy NiFi Registry user: unit-test-user",
+                    "3rd commit.\n\nBy NiFi Registry user: unit-test-user",
+                    "2nd commit.\n\nBy NiFi Registry user: unit-test-user",
+                    "Initial commit.\n\nBy NiFi Registry user: unit-test-user"
+            };
+            for (RevCommit commit : g.log().call()) {
+                assertEquals("git-user", commit.getAuthorIdent().getName());
+                final int commitIndex = commitCount.getAndIncrement();
+                assertEquals(commitMessages[commitIndex], 
commit.getFullMessage());
+            }
+            assertEquals(commitMessages.length, commitCount.get());
+        }, p -> {
+            // Should be able to load flow from commit histories.
+            final byte[] flow1Ver1 = p.getFlowContent("bucket-id-A", 
"flow-id-1", 1);
+            assertEquals("Flow1 ver.1", new String(flow1Ver1, 
StandardCharsets.UTF_8));
+
+            final byte[] flow1Ver2 = p.getFlowContent("bucket-id-A", 
"flow-id-1", 2);
+            assertEquals("Flow1 ver.2", new String(flow1Ver2, 
StandardCharsets.UTF_8));
+
+            // Even if the name of flow has been changed, it can be retrieved 
by the same flow id.
+            final byte[] flow1Ver3 = p.getFlowContent("bucket-id-A", 
"flow-id-1", 3);
+            assertEquals("FlowOne ver.3", new String(flow1Ver3, 
StandardCharsets.UTF_8));
+
+            final byte[] flow2Ver1 = p.getFlowContent("bucket-id-A", 
"flow-id-2", 1);
+            assertEquals("FlowTwo ver.1", new String(flow2Ver1, 
StandardCharsets.UTF_8));
+
+            // Even if the name of bucket has been changed, it can be 
retrieved by the same flow id.
+            final byte[] flow2Ver2 = p.getFlowContent("bucket-id-A", 
"flow-id-2", 2);
+            assertEquals("FlowTwo ver.2", new String(flow2Ver2, 
StandardCharsets.UTF_8));
+
+            // Delete the 2nd flow.
+            p.deleteAllFlowContent("bucket-id-A", "flow-id-2");
+
+        }, false);
+
+        assertProvider(properties, g -> {
+            // Assert commit.
+            final AtomicInteger commitCount = new AtomicInteger(0);
+            final String[] commitMessages = {
+                    "Deleted flow FlowTwo.snapshot:flow-id-2 in bucket 
New_name_for_Bucket_A:bucket-id-A.",
+                    "5th commit.",
+                    "4th commit.",
+                    "3rd commit.",
+                    "2nd commit.",
+                    "Initial commit."
+            };
+            for (RevCommit commit : g.log().call()) {
+                assertEquals("git-user", commit.getAuthorIdent().getName());
+                final int commitIndex = commitCount.getAndIncrement();
+                assertEquals(commitMessages[commitIndex], 
commit.getShortMessage());
+            }
+            assertEquals(commitMessages.length, commitCount.get());
+        }, p -> {
+            // Should be able to load flow from commit histories.
+            final byte[] flow1Ver1 = p.getFlowContent("bucket-id-A", 
"flow-id-1", 1);
+            assertEquals("Flow1 ver.1", new String(flow1Ver1, 
StandardCharsets.UTF_8));
+
+            final byte[] flow1Ver2 = p.getFlowContent("bucket-id-A", 
"flow-id-1", 2);
+            assertEquals("Flow1 ver.2", new String(flow1Ver2, 
StandardCharsets.UTF_8));
+
+            // Even if the name of flow has been changed, it can be retrieved 
by the same flow id.
+            final byte[] flow1Ver3 = p.getFlowContent("bucket-id-A", 
"flow-id-1", 3);
+            assertEquals("FlowOne ver.3", new String(flow1Ver3, 
StandardCharsets.UTF_8));
+
+            // The 2nd flow has been deleted, and should not exist.
+            try {
+                p.getFlowContent("bucket-id-A", "flow-id-2", 1);
+            } catch (FlowPersistenceException e) {
+                assertEquals("Flow ID flow-id-2 was not found in bucket 
New_name_for_Bucket_A:bucket-id-A.", e.getMessage());
+            }
+
+            try {
+                p.getFlowContent("bucket-id-A", "flow-id-2", 2);
+            } catch (FlowPersistenceException e) {
+                assertEquals("Flow ID flow-id-2 was not found in bucket 
New_name_for_Bucket_A:bucket-id-A.", e.getMessage());
+            }
+
+            // Delete the 1st flow, too.
+            p.deleteAllFlowContent("bucket-id-A", "flow-id-1");
+
+        }, false);
+
+        assertProvider(properties, g -> {
+            // Assert commit.
+            final AtomicInteger commitCount = new AtomicInteger(0);
+            final String[] commitMessages = {
+                    "Deleted flow FlowOne.snapshot:flow-id-1 in bucket 
New_name_for_Bucket_A:bucket-id-A.",
+                    "Deleted flow FlowTwo.snapshot:flow-id-2 in bucket 
New_name_for_Bucket_A:bucket-id-A.",
+                    "5th commit.",
+                    "4th commit.",
+                    "3rd commit.",
+                    "2nd commit.",
+                    "Initial commit."
+            };
+            for (RevCommit commit : g.log().call()) {
+                assertEquals("git-user", commit.getAuthorIdent().getName());
+                final int commitIndex = commitCount.getAndIncrement();
+                assertEquals(commitMessages[commitIndex], 
commit.getShortMessage());
+            }
+            assertEquals(commitMessages.length, commitCount.get());
+        }, p -> {
+            // The 1st flow has been deleted, and should not exist. Moreover, 
the bucket A has been deleted since there's no flow.
+            try {
+                p.getFlowContent("bucket-id-A", "flow-id-1", 1);
+            } catch (FlowPersistenceException e) {
+                assertEquals("Bucket ID bucket-id-A was not found.", 
e.getMessage());
+            }
+        }, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java
----------------------------------------------------------------------
diff --git 
a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java
 
b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java
index 75c76b7..584e2f7 100644
--- 
a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java
+++ 
b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java
@@ -23,6 +23,11 @@ import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class TestVersionedProcessGroupSerializer {
 
@@ -57,4 +62,69 @@ public class TestVersionedProcessGroupSerializer {
         Assert.assertEquals(processor1.getName(), 
deserializedProcessor1.getName());
 
     }
+
+    @Test
+    public void testDeserializeJsonNonIntegerVersion() throws IOException {
+        final String file = "/serialization/json/non-integer-version.snapshot";
+        final VersionedProcessGroupSerializer serializer = new 
VersionedProcessGroupSerializer();
+        try (final InputStream is = this.getClass().getResourceAsStream(file)) 
{
+            try {
+                serializer.deserialize(is);
+                fail("Should fail");
+            } catch (SerializationException e) {
+                assertEquals("Unable to find a process group serializer 
compatible with the input.", e.getMessage());
+            }
+        }
+    }
+
+    @Test
+    public void testDeserializeJsonNoVersion() throws IOException {
+        final String file = "/serialization/json/no-version.snapshot";
+        final VersionedProcessGroupSerializer serializer = new 
VersionedProcessGroupSerializer();
+        try (final InputStream is = this.getClass().getResourceAsStream(file)) 
{
+            try {
+                serializer.deserialize(is);
+                fail("Should fail");
+            } catch (SerializationException e) {
+                assertEquals("Unable to find a process group serializer 
compatible with the input.", e.getMessage());
+            }
+        }
+    }
+
+    @Test
+    public void testDeserializeVer1() throws IOException {
+        final String file = "/serialization/ver1.snapshot";
+        final VersionedProcessGroupSerializer serializer = new 
VersionedProcessGroupSerializer();
+        final VersionedProcessGroup processGroup;
+        try (final InputStream is = this.getClass().getResourceAsStream(file)) 
{
+            processGroup = serializer.deserialize(is);
+        }
+        System.out.printf("processGroup=" + processGroup);
+    }
+
+    @Test
+    public void testDeserializeVer2() throws IOException {
+        final String file = "/serialization/ver2.snapshot";
+        final VersionedProcessGroupSerializer serializer = new 
VersionedProcessGroupSerializer();
+        final VersionedProcessGroup processGroup;
+        try (final InputStream is = this.getClass().getResourceAsStream(file)) 
{
+            processGroup = serializer.deserialize(is);
+        }
+        System.out.printf("processGroup=" + processGroup);
+    }
+
+    @Test
+    public void testDeserializeVer3() throws IOException {
+        final String file = "/serialization/ver3.snapshot";
+        final VersionedProcessGroupSerializer serializer = new 
VersionedProcessGroupSerializer();
+        try (final InputStream is = this.getClass().getResourceAsStream(file)) 
{
+            try {
+                serializer.deserialize(is);
+                fail("Should fail");
+            } catch (SerializationException e) {
+                assertEquals("Unable to find a process group serializer 
compatible with the input.", e.getMessage());
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java
----------------------------------------------------------------------
diff --git 
a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java
 
b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java
index 1ea9e7c..916e053 100644
--- 
a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java
+++ 
b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java
@@ -19,7 +19,7 @@ package org.apache.nifi.registry.serialization.jaxb;
 import org.apache.nifi.registry.flow.VersionedProcessGroup;
 import org.apache.nifi.registry.flow.VersionedProcessor;
 import org.apache.nifi.registry.serialization.SerializationException;
-import org.apache.nifi.registry.serialization.Serializer;
+import org.apache.nifi.registry.serialization.VersionedSerializer;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -31,7 +31,7 @@ public class TestJAXBVersionedProcessGroupSerializer {
 
     @Test
     public void testSerializeDeserializeFlowSnapshot() throws 
SerializationException {
-        final Serializer<VersionedProcessGroup> serializer = new 
JAXBVersionedProcessGroupSerializer();
+        final VersionedSerializer<VersionedProcessGroup> serializer = new 
JAXBVersionedProcessGroupSerializer();
 
         final VersionedProcessGroup processGroup1 = new 
VersionedProcessGroup();
         processGroup1.setIdentifier("pg1");
@@ -45,12 +45,18 @@ public class TestJAXBVersionedProcessGroupSerializer {
         processGroup1.getProcessors().add(processor1);
 
         final ByteArrayOutputStream out = new ByteArrayOutputStream();
-        serializer.serialize(processGroup1, out);
+        serializer.serialize(1, processGroup1, out);
 
         final String snapshotStr = new String(out.toByteArray(), 
StandardCharsets.UTF_8);
         //System.out.println(snapshotStr);
 
         final ByteArrayInputStream in = new 
ByteArrayInputStream(out.toByteArray());
+        in.mark(1024);
+        final int version = serializer.readDataModelVersion(in);
+
+        Assert.assertEquals(1, version);
+
+        in.reset();
         final VersionedProcessGroup deserializedProcessGroup1 = 
serializer.deserialize(in);
 
         Assert.assertEquals(processGroup1.getIdentifier(), 
deserializedProcessGroup1.getIdentifier());

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/resources/serialization/json/no-version.snapshot
----------------------------------------------------------------------
diff --git 
a/nifi-registry-framework/src/test/resources/serialization/json/no-version.snapshot
 
b/nifi-registry-framework/src/test/resources/serialization/json/no-version.snapshot
new file mode 100644
index 0000000..ce1901f
--- /dev/null
+++ 
b/nifi-registry-framework/src/test/resources/serialization/json/no-version.snapshot
@@ -0,0 +1,5 @@
+{
+  "header": {
+  },
+  "content": {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/resources/serialization/json/non-integer-version.snapshot
----------------------------------------------------------------------
diff --git 
a/nifi-registry-framework/src/test/resources/serialization/json/non-integer-version.snapshot
 
b/nifi-registry-framework/src/test/resources/serialization/json/non-integer-version.snapshot
new file mode 100644
index 0000000..33d4da3
--- /dev/null
+++ 
b/nifi-registry-framework/src/test/resources/serialization/json/non-integer-version.snapshot
@@ -0,0 +1,6 @@
+{
+  "header": {
+    "dataModelVersion": "One"
+  },
+  "content": {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/resources/serialization/ver1.snapshot
----------------------------------------------------------------------
diff --git 
a/nifi-registry-framework/src/test/resources/serialization/ver1.snapshot 
b/nifi-registry-framework/src/test/resources/serialization/ver1.snapshot
new file mode 100644
index 0000000..7c1ab49
Binary files /dev/null and 
b/nifi-registry-framework/src/test/resources/serialization/ver1.snapshot differ

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/resources/serialization/ver2.snapshot
----------------------------------------------------------------------
diff --git 
a/nifi-registry-framework/src/test/resources/serialization/ver2.snapshot 
b/nifi-registry-framework/src/test/resources/serialization/ver2.snapshot
new file mode 100644
index 0000000..7f4dfc5
--- /dev/null
+++ b/nifi-registry-framework/src/test/resources/serialization/ver2.snapshot
@@ -0,0 +1,97 @@
+{
+  "header": {
+    "dataModelVersion": 2
+  },
+  "content": {
+    "identifier": "a2c80883-171c-316d-ba25-24df2c352693",
+    "name": "Flow1",
+    "comments": "",
+    "position": {
+      "x": 1549.249149182042,
+      "y": 764.2426186568309
+    },
+    "processGroups": [],
+    "remoteProcessGroups": [],
+    "processors": [
+      {
+        "identifier": "92fe4513-21c0-34f6-a916-2874f46ae864",
+        "name": "GenerateFlowFile",
+        "comments": "",
+        "position": {
+          "x": 488.99999411591034,
+          "y": 114.00000359389122
+        },
+        "bundle": {
+          "group": "org.apache.nifi",
+          "artifact": "nifi-standard-nar",
+          "version": "1.6.0-SNAPSHOT"
+        },
+        "style": {},
+        "type": "org.apache.nifi.processors.standard.GenerateFlowFile",
+        "properties": {
+          "character-set": "UTF-8",
+          "File Size": "0B",
+          "Batch Size": "1",
+          "Unique FlowFiles": "false",
+          "Data Format": "Text"
+        },
+        "propertyDescriptors": {
+          "character-set": {
+            "name": "character-set",
+            "displayName": "Character Set",
+            "identifiesControllerService": false,
+            "sensitive": false
+          },
+          "File Size": {
+            "name": "File Size",
+            "displayName": "File Size",
+            "identifiesControllerService": false,
+            "sensitive": false
+          },
+          "generate-ff-custom-text": {
+            "name": "generate-ff-custom-text",
+            "displayName": "Custom Text",
+            "identifiesControllerService": false,
+            "sensitive": false
+          },
+          "Batch Size": {
+            "name": "Batch Size",
+            "displayName": "Batch Size",
+            "identifiesControllerService": false,
+            "sensitive": false
+          },
+          "Unique FlowFiles": {
+            "name": "Unique FlowFiles",
+            "displayName": "Unique FlowFiles",
+            "identifiesControllerService": false,
+            "sensitive": false
+          },
+          "Data Format": {
+            "name": "Data Format",
+            "displayName": "Data Format",
+            "identifiesControllerService": false,
+            "sensitive": false
+          }
+        },
+        "schedulingPeriod": "0 sec",
+        "schedulingStrategy": "TIMER_DRIVEN",
+        "executionNode": "ALL",
+        "penaltyDuration": "30 sec",
+        "yieldDuration": "1 sec",
+        "bulletinLevel": "WARN",
+        "runDurationMillis": 0,
+        "concurrentlySchedulableTaskCount": 1,
+        "componentType": "PROCESSOR",
+        "groupIdentifier": "a2c80883-171c-316d-ba25-24df2c352693"
+      }
+    ],
+    "inputPorts": [],
+    "outputPorts": [],
+    "connections": [],
+    "labels": [],
+    "funnels": [],
+    "controllerServices": [],
+    "variables": {},
+    "componentType": "PROCESS_GROUP"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot
----------------------------------------------------------------------
diff --git 
a/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot 
b/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot
new file mode 100644
index 0000000..574fe56
--- /dev/null
+++ b/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot
@@ -0,0 +1,6 @@
+{
+  "header": {
+    "dataModelVersion": 3
+  },
+  "content": {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
 
b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
index c5e06f5..c4bdd46 100644
--- 
a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
+++ 
b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
@@ -56,4 +56,9 @@ public interface FlowSnapshotContext {
      */
     long getSnapshotTimestamp();
 
+    /**
+     * @return the name of the user who created the snapshot
+     */
+    String getAuthor();
+
 }

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-resources/src/main/resources/conf/providers.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-resources/src/main/resources/conf/providers.xml 
b/nifi-registry-resources/src/main/resources/conf/providers.xml
index 40bf012..720bee2 100644
--- a/nifi-registry-resources/src/main/resources/conf/providers.xml
+++ b/nifi-registry-resources/src/main/resources/conf/providers.xml
@@ -20,4 +20,13 @@
         <property name="Flow Storage Directory">./flow_storage</property>
     </flowPersistenceProvider>
 
+    <!--
+    <flowPersistenceProvider>
+        
<class>org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider</class>
+        <property name="Flow Storage Directory">./flow_storage</property>
+        <property name="Remote To Push"></property>
+        <property name="Remote Access User"></property>
+        <property name="Remote Access Password"></property>
+    </flowPersistenceProvider>
+    -->
 </providers>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java
 
b/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java
index b7476b9..5abaf7e 100644
--- 
a/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java
+++ 
b/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java
@@ -389,4 +389,38 @@ public class FileUtils {
             /* do nothing */
         }
     }
+
+
+    // The invalid character list is derived from this Stackoverflow page.
+    // 
https://stackoverflow.com/questions/1155107/is-there-a-cross-platform-java-method-to-remove-filename-special-chars
+    private final static int[] INVALID_CHARS = {34, 60, 62, 124, 0, 1, 2, 3, 
4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
+            16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 
58, 42, 63, 92, 47, 32};
+
+    static {
+        Arrays.sort(INVALID_CHARS);
+    }
+
+    /**
+     * Replaces invalid characters for a file system name within a given 
filename string to underscore '_'.
+     * Be careful not to pass a file path as this method replaces path 
delimiter characters (i.e forward/back slashes).
+     * @param filename The filename to clean
+     * @return sanitized filename
+     */
+    public static String sanitizeFilename(String filename) {
+        if (filename == null || filename.isEmpty()) {
+            return filename;
+        }
+        int codePointCount = filename.codePointCount(0, filename.length());
+
+        final StringBuilder cleanName = new StringBuilder();
+        for (int i = 0; i < codePointCount; i++) {
+            int c = filename.codePointAt(i);
+            if (Arrays.binarySearch(INVALID_CHARS, c) < 0) {
+                cleanName.appendCodePoint(c);
+            } else {
+                cleanName.append('_');
+            }
+        }
+        return cleanName.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-utils/src/test/java/org/apache/nifi/registry/util/TestFileUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-registry-utils/src/test/java/org/apache/nifi/registry/util/TestFileUtils.java
 
b/nifi-registry-utils/src/test/java/org/apache/nifi/registry/util/TestFileUtils.java
new file mode 100644
index 0000000..d4bc963
--- /dev/null
+++ 
b/nifi-registry-utils/src/test/java/org/apache/nifi/registry/util/TestFileUtils.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestFileUtils {
+    @Test
+    public void testSanitizeFilename() {
+        String filename = "This / is / a test";
+        final String sanitizedFilename = FileUtils.sanitizeFilename(filename);
+        assertEquals("This___is___a_test", sanitizedFilename);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/c01f79a2/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java
 
b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java
index a2174ba..10a044a 100644
--- 
a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java
+++ 
b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java
@@ -16,10 +16,7 @@
  */
 package org.apache.nifi.registry.web.mapper;
 
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import org.apache.nifi.registry.serialization.jackson.ObjectMapperProvider;
 import 
org.glassfish.jersey.jackson.internal.jackson.jaxrs.json.JacksonJaxbJsonProvider;
 import org.springframework.stereotype.Component;
 
@@ -32,17 +29,8 @@ import javax.ws.rs.ext.Provider;
 @Produces(MediaType.APPLICATION_JSON)
 public class NiFiRegistryJsonProvider extends JacksonJaxbJsonProvider {
 
-    private static final ObjectMapper mapper = new ObjectMapper();
-
-    static {
-        mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
-        
mapper.setPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL,
 JsonInclude.Include.NON_NULL));
-        mapper.setAnnotationIntrospector(new 
JaxbAnnotationIntrospector(mapper.getTypeFactory()));
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-    }
-
     public NiFiRegistryJsonProvider() {
         super();
-        setMapper(mapper);
+        setMapper(ObjectMapperProvider.getMapper());
     }
 }

Reply via email to