This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c706877147 NIFI-12297 Standardized File Path resolution in Persistence
Providers (#7975)
c706877147 is described below
commit c706877147d89b7947f48cea68b3577bbfe3798d
Author: exceptionfactory <[email protected]>
AuthorDate: Fri Nov 3 11:26:11 2023 -0500
NIFI-12297 Standardized File Path resolution in Persistence Providers
(#7975)
---
.../FileSystemBundlePersistenceProvider.java | 53 ++++++--
.../flow/FileSystemFlowPersistenceProvider.java | 72 ++++++++---
.../TestFileSystemBundlePersistenceProvider.java | 117 ++++++++----------
.../TestFileSystemFlowPersistenceProvider.java | 137 ++++++++++-----------
.../registry/FileSystemFlowRegistryClient.java | 122 +++++++++++-------
.../tests/system/registry/RegistryClientIT.java | 30 ++---
.../tests/system/stateless/StatelessBasicsIT.java | 2 +-
7 files changed, 306 insertions(+), 227 deletions(-)
diff --git
a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/FileSystemBundlePersistenceProvider.java
b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/FileSystemBundlePersistenceProvider.java
index 555260a227..96bbceaffa 100644
---
a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/FileSystemBundlePersistenceProvider.java
+++
b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/FileSystemBundlePersistenceProvider.java
@@ -39,7 +39,12 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* An {@link BundlePersistenceProvider} that uses local file-system for
storage.
@@ -53,6 +58,8 @@ public class FileSystemBundlePersistenceProvider implements
BundlePersistencePro
static final String NAR_EXTENSION = ".nar";
static final String CPP_EXTENSION = ".cpp";
+ private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+
private File bundleStorageDir;
@Override
@@ -71,8 +78,7 @@ public class FileSystemBundlePersistenceProvider implements
BundlePersistencePro
try {
bundleStorageDir = new File(bundleStorageDirValue);
FileUtils.ensureDirectoryExistAndCanReadAndWrite(bundleStorageDir);
- LOGGER.info("Configured BundlePersistenceProvider with Extension
Bundle Storage Directory {}",
- new Object[] {bundleStorageDir.getAbsolutePath()});
+ LOGGER.info("Configured BundlePersistenceProvider with Extension
Bundle Storage Directory {}", bundleStorageDir.getAbsolutePath());
} catch (IOException e) {
throw new ProviderCreationException(e);
}
@@ -107,7 +113,7 @@ public class FileSystemBundlePersistenceProvider implements
BundlePersistencePro
}
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Writing extension bundle to {}", new
Object[]{bundleFile.getAbsolutePath()});
+ LOGGER.debug("Writing extension bundle to {}",
bundleFile.getAbsolutePath());
}
try (final OutputStream out = new FileOutputStream(bundleFile)) {
@@ -124,7 +130,7 @@ public class FileSystemBundlePersistenceProvider implements
BundlePersistencePro
final File bundleFile = getBundleFile(versionCoordinate);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Reading extension bundle from {}", new
Object[]{bundleFile.getAbsolutePath()});
+ LOGGER.debug("Reading extension bundle from {}",
bundleFile.getAbsolutePath());
}
try (final InputStream in = new FileInputStream(bundleFile);
@@ -142,7 +148,7 @@ public class FileSystemBundlePersistenceProvider implements
BundlePersistencePro
public synchronized void deleteBundleVersion(final BundleVersionCoordinate
versionCoordinate) throws BundlePersistenceException {
final File bundleFile = getBundleFile(versionCoordinate);
if (!bundleFile.exists()) {
- LOGGER.warn("Extension bundle content does not exist at {}", new
Object[] {bundleFile.getAbsolutePath()});
+ LOGGER.warn("Extension bundle content does not exist at {}",
bundleFile.getAbsolutePath());
return;
}
@@ -152,7 +158,7 @@ public class FileSystemBundlePersistenceProvider implements
BundlePersistencePro
}
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Deleted extension bundle content at {}", new
Object[] {bundleFile.getAbsolutePath()});
+ LOGGER.debug("Deleted extension bundle content at {}",
bundleFile.getAbsolutePath());
}
}
@@ -160,7 +166,7 @@ public class FileSystemBundlePersistenceProvider implements
BundlePersistencePro
public synchronized void deleteAllBundleVersions(final BundleCoordinate
bundleCoordinate) throws BundlePersistenceException {
final File bundleDir = getBundleDirectory(bundleStorageDir,
bundleCoordinate);
if (!bundleDir.exists()) {
- LOGGER.warn("Extension bundle directory does not exist at {}", new
Object[] {bundleDir.getAbsolutePath()});
+ LOGGER.warn("Extension bundle directory does not exist at {}",
bundleDir.getAbsolutePath());
return;
}
@@ -207,7 +213,8 @@ public class FileSystemBundlePersistenceProvider implements
BundlePersistencePro
final String groupId = bundleCoordinate.getGroupId();
final String artifactId = bundleCoordinate.getArtifactId();
- return new File(bundleStorageDir, sanitize(bucketId) + "/" +
sanitize(groupId) + "/" + sanitize(artifactId));
+ final Path artifactPath = getArtifactPath(bucketId, groupId,
artifactId);
+ return getChildLocation(bundleStorageDir, artifactPath);
}
static File getBundleVersionDirectory(final File bundleStorageDir, final
BundleVersionCoordinate versionCoordinate) {
@@ -216,7 +223,9 @@ public class FileSystemBundlePersistenceProvider implements
BundlePersistencePro
final String artifactId = versionCoordinate.getArtifactId();
final String version = versionCoordinate.getVersion();
- return new File(bundleStorageDir, sanitize(bucketId) + "/" +
sanitize(groupId) + "/" + sanitize(artifactId) + "/" + sanitize(version));
+ final Path artifactPath = getArtifactPath(bucketId, groupId,
artifactId);
+ final Path versionPath = Paths.get(sanitize(version)).normalize();
+ return getChildLocation(bundleStorageDir,
artifactPath.resolve(versionPath));
}
static File getBundleFile(final File parentDir, final
BundleVersionCoordinate versionCoordinate) {
@@ -227,7 +236,11 @@ public class FileSystemBundlePersistenceProvider
implements BundlePersistencePro
final String bundleFileExtension = getBundleFileExtension(bundleType);
final String bundleFilename = sanitize(artifactId) + "-" +
sanitize(version) + bundleFileExtension;
- return new File(parentDir, bundleFilename);
+ return getChildLocation(parentDir, Paths.get(bundleFilename));
+ }
+
+ static Path getArtifactPath(final String bucketId, final String groupId,
final String artifactId) {
+ return Paths.get(getNormalizedBucketId(bucketId), sanitize(groupId),
sanitize(artifactId)).normalize();
}
static String sanitize(final String input) {
@@ -246,4 +259,24 @@ public class FileSystemBundlePersistenceProvider
implements BundlePersistencePro
}
}
+ private static String getNormalizedBucketId(final String id) {
+ final String sanitized =
FileUtils.sanitizeFilename(id).trim().toLowerCase();
+ final Matcher matcher = NUMBER_PATTERN.matcher(sanitized);
+ if (matcher.matches()) {
+ final int normalized = Integer.parseInt(sanitized);
+ return Integer.toString(normalized);
+ } else {
+ final UUID normalized = UUID.fromString(id);
+ return normalized.toString();
+ }
+ }
+
+ private static File getChildLocation(final File parentDir, final Path
childLocation) {
+ final Path parentPath = parentDir.toPath().normalize();
+ final Path childPath = parentPath.resolve(childLocation.normalize());
+ if (childPath.startsWith(parentPath)) {
+ return childPath.toFile();
+ }
+ throw new IllegalArgumentException(String.format("Child location not
valid [%s]", childLocation));
+ }
}
diff --git
a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
index 071656d0eb..34fe9c8e74 100644
---
a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
+++
b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
@@ -33,7 +33,12 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* A FlowPersistenceProvider that uses the local filesystem for storage.
@@ -46,6 +51,8 @@ public class FileSystemFlowPersistenceProvider implements
FlowPersistenceProvide
static final String SNAPSHOT_EXTENSION = ".snapshot";
+ private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+
private File flowStorageDir;
@Override
@@ -63,7 +70,7 @@ public class FileSystemFlowPersistenceProvider implements
FlowPersistenceProvide
try {
flowStorageDir = new File(flowStorageDirValue);
FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowStorageDir);
- LOGGER.info("Configured FileSystemFlowPersistenceProvider with
Flow Storage Directory {}", new Object[] {flowStorageDir.getAbsolutePath()});
+ LOGGER.info("Configured FileSystemFlowPersistenceProvider with
Flow Storage Directory {}", flowStorageDir.getAbsolutePath());
} catch (IOException e) {
throw new ProviderCreationException(e);
}
@@ -71,14 +78,14 @@ public class FileSystemFlowPersistenceProvider implements
FlowPersistenceProvide
@Override
public synchronized void saveFlowContent(final FlowSnapshotContext
context, final byte[] content) throws FlowPersistenceException {
- final File bucketDir = new File(flowStorageDir, context.getBucketId());
+ final File bucketDir = getChildLocation(flowStorageDir,
getNormalizedIdPath(context.getBucketId()));
try {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(bucketDir);
} catch (IOException e) {
throw new FlowPersistenceException("Error accessing bucket
directory at " + bucketDir.getAbsolutePath(), e);
}
- final File flowDir = new File(bucketDir, context.getFlowId());
+ final File flowDir = getChildLocation(bucketDir,
getNormalizedIdPath(context.getFlowId()));
try {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowDir);
} catch (IOException e) {
@@ -86,27 +93,28 @@ public class FileSystemFlowPersistenceProvider implements
FlowPersistenceProvide
}
final String versionString = String.valueOf(context.getVersion());
- final File versionDir = new File(flowDir, versionString);
+ final File versionDir = getChildLocation(flowDir,
Paths.get(versionString));
try {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(versionDir);
} catch (IOException e) {
throw new FlowPersistenceException("Error accessing version
directory at " + versionDir.getAbsolutePath(), e);
}
- final File versionFile = new File(versionDir, versionString +
SNAPSHOT_EXTENSION);
+ final String versionExtension = versionString + SNAPSHOT_EXTENSION;
+ final File versionFile = getChildLocation(versionDir,
Paths.get(versionExtension));
if (versionFile.exists()) {
throw new FlowPersistenceException("Unable to save, a snapshot
already exists with version " + versionString);
}
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Saving snapshot with filename {}", new Object[]
{versionFile.getAbsolutePath()});
+ LOGGER.debug("Saving snapshot with filename {}",
versionFile.getAbsolutePath());
}
try (final OutputStream out = new FileOutputStream(versionFile)) {
out.write(content);
out.flush();
} catch (Exception e) {
- throw new FlowPersistenceException("Unable to write snapshot to
disk due to " + e.getMessage(), e);
+ throw new FlowPersistenceException("Unable to write snapshot to
disk", e);
}
}
@@ -114,7 +122,7 @@ public class FileSystemFlowPersistenceProvider implements
FlowPersistenceProvide
public synchronized byte[] getFlowContent(final String bucketId, final
String flowId, final int version) throws FlowPersistenceException {
final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Retrieving snapshot with filename {}", new Object[]
{snapshotFile.getAbsolutePath()});
+ LOGGER.debug("Retrieving snapshot with filename {}",
snapshotFile.getAbsolutePath());
}
if (!snapshotFile.exists()) {
@@ -130,9 +138,12 @@ public class FileSystemFlowPersistenceProvider implements
FlowPersistenceProvide
@Override
public synchronized void deleteAllFlowContent(final String bucketId, final
String flowId) throws FlowPersistenceException {
- final File flowDir = new File(flowStorageDir, bucketId + "/" + flowId);
+ final Path bucketIdPath = getNormalizedIdPath(bucketId);
+ final Path flowIdPath = getNormalizedIdPath(flowId);
+ final Path bucketFlowPath = bucketIdPath.resolve(flowIdPath);
+ final File flowDir = getChildLocation(flowStorageDir, bucketFlowPath);
if (!flowDir.exists()) {
- LOGGER.debug("Snapshot directory does not exist at {}", new
Object[] {flowDir.getAbsolutePath()});
+ LOGGER.debug("Snapshot directory does not exist at {}",
flowDir.getAbsolutePath());
return;
}
@@ -150,12 +161,12 @@ public class FileSystemFlowPersistenceProvider implements
FlowPersistenceProvide
}
// delete the directory for the bucket if there is nothing left
- final File bucketDir = new File(flowStorageDir, bucketId);
+ final File bucketDir = getChildLocation(flowStorageDir,
getNormalizedIdPath(bucketId));
final File[] bucketFiles = bucketDir.listFiles();
- if (bucketFiles.length == 0) {
+ if (bucketFiles == null || bucketFiles.length == 0) {
final boolean deletedBucket = bucketDir.delete();
if (!deletedBucket) {
- LOGGER.error("Unable to delete bucket directory: " +
flowDir.getAbsolutePath());
+ LOGGER.error("Unable to delete bucket directory: {}",
flowDir.getAbsolutePath());
}
}
}
@@ -164,7 +175,7 @@ public class FileSystemFlowPersistenceProvider implements
FlowPersistenceProvide
public synchronized void deleteFlowContent(final String bucketId, final
String flowId, final int version) throws FlowPersistenceException {
final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
if (!snapshotFile.exists()) {
- LOGGER.debug("Snapshot file does not exist at {}", new Object[]
{snapshotFile.getAbsolutePath()});
+ LOGGER.debug("Snapshot file does not exist at {}",
snapshotFile.getAbsolutePath());
return;
}
@@ -174,13 +185,40 @@ public class FileSystemFlowPersistenceProvider implements
FlowPersistenceProvide
}
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Deleted snapshot at {}", new Object[]
{snapshotFile.getAbsolutePath()});
+ LOGGER.debug("Deleted snapshot at {}",
snapshotFile.getAbsolutePath());
}
}
protected File getSnapshotFile(final String bucketId, final String flowId,
final int version) {
- final String snapshotFilename = bucketId + "/" + flowId + "/" +
version + "/" + version + SNAPSHOT_EXTENSION;
- return new File(flowStorageDir, snapshotFilename);
+ final String versionExtension = version + SNAPSHOT_EXTENSION;
+ final Path snapshotLocation = Paths.get(getNormalizedId(bucketId),
getNormalizedId(flowId), Integer.toString(version), versionExtension);
+ return getChildLocation(flowStorageDir, snapshotLocation);
+ }
+
+ private File getChildLocation(final File parentDir, final Path
childLocation) {
+ final Path parentPath = parentDir.toPath().normalize();
+ final Path childPathNormalized = childLocation.normalize();
+ final Path childPath = parentPath.resolve(childPathNormalized);
+ if (childPath.startsWith(parentPath)) {
+ return childPath.toFile();
+ }
+ throw new IllegalArgumentException(String.format("Child location not
valid [%s]", childLocation));
}
+ private Path getNormalizedIdPath(final String id) {
+ final String normalizedId = getNormalizedId(id);
+ return Paths.get(normalizedId).normalize();
+ }
+
+ private String getNormalizedId(final String input) {
+ final String sanitized =
FileUtils.sanitizeFilename(input).trim().toLowerCase();
+ final Matcher matcher = NUMBER_PATTERN.matcher(sanitized);
+ if (matcher.matches()) {
+ final int normalized = Integer.parseInt(sanitized);
+ return Integer.toString(normalized);
+ } else {
+ final UUID normalized = UUID.fromString(input);
+ return normalized.toString();
+ }
+ }
}
diff --git
a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/extension/TestFileSystemBundlePersistenceProvider.java
b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/extension/TestFileSystemBundlePersistenceProvider.java
index fe8dc0749f..9a0eea300d 100644
---
a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/extension/TestFileSystemBundlePersistenceProvider.java
+++
b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/extension/TestFileSystemBundlePersistenceProvider.java
@@ -43,20 +43,28 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.when;
public class TestFileSystemBundlePersistenceProvider {
static final String EXTENSION_STORAGE_DIR = "target/extension_storage";
- static final ProviderConfigurationContext CONFIGURATION_CONTEXT = new
ProviderConfigurationContext() {
- @Override
- public Map<String, String> getProperties() {
- final Map<String,String> props = new HashMap<>();
-
props.put(FileSystemBundlePersistenceProvider.BUNDLE_STORAGE_DIR_PROP,
EXTENSION_STORAGE_DIR);
- return props;
- }
+ private static final String BUCKET_ID =
"b0000000-0000-0000-0000-000000000000";
+
+ private static final String SECOND_BUCKET_ID =
"b2000000-0000-0000-0000-000000000000";
+
+ private static final String GROUP_ID =
"c0000000-0000-0000-0000-000000000000";
+
+ private static final String ARTIFACT_ID =
"a0000000-0000-0000-0000-000000000000";
+
+ private static final String FIRST_VERSION = "1.0.0";
+
+ private static final String SECOND_VERSION = "1.1.0";
+
+ static final ProviderConfigurationContext CONFIGURATION_CONTEXT = () -> {
+ final Map<String,String> props = new HashMap<>();
+ props.put(FileSystemBundlePersistenceProvider.BUNDLE_STORAGE_DIR_PROP,
EXTENSION_STORAGE_DIR);
+ return props;
};
private File bundleStorageDir;
@@ -82,20 +90,20 @@ public class TestFileSystemBundlePersistenceProvider {
final BundleVersionType type = BundleVersionType.NIFI_NAR;
// first version in b1
- final String content1 = "g1-a1-1.0.0";
- final BundleVersionCoordinate versionCoordinate1 =
getVersionCoordinate("b1", "g1", "a1", "1.0.0", type);
+ final String content1 = String.format("%s-%s-%s", GROUP_ID,
ARTIFACT_ID, FIRST_VERSION);
+ final BundleVersionCoordinate versionCoordinate1 =
getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION, type);
createBundleVersion(fileSystemBundleProvider, versionCoordinate1 ,
content1);
verifyBundleVersion(bundleStorageDir, versionCoordinate1, content1);
// second version in b1
- final String content2 = "g1-a1-1.1.0";
- final BundleVersionCoordinate versionCoordinate2 =
getVersionCoordinate("b1", "g1", "a1", "1.1.0", type);
+ final String content2 = String.format("%s-%s-%s", GROUP_ID,
ARTIFACT_ID, SECOND_VERSION);
+ final BundleVersionCoordinate versionCoordinate2 =
getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, SECOND_VERSION, type);
createBundleVersion(fileSystemBundleProvider, versionCoordinate2,
content2);
verifyBundleVersion(bundleStorageDir, versionCoordinate2, content2);
// same bundle but in b2
- final String content3 = "g1-a1-1.1.0";
- final BundleVersionCoordinate versionCoordinate3 =
getVersionCoordinate("b2", "g1", "a1", "1.1.0", type);
+ final String content3 = String.format("%s-%s-%s", GROUP_ID,
ARTIFACT_ID, SECOND_VERSION);
+ final BundleVersionCoordinate versionCoordinate3 =
getVersionCoordinate(SECOND_BUCKET_ID, GROUP_ID, ARTIFACT_ID, SECOND_VERSION,
type);
createBundleVersion(fileSystemBundleProvider, versionCoordinate3,
content3);
verifyBundleVersion(bundleStorageDir, versionCoordinate3, content2);
}
@@ -104,19 +112,14 @@ public class TestFileSystemBundlePersistenceProvider {
public void testCreateWhenBundleVersionAlreadyExists() throws IOException {
final BundleVersionType type = BundleVersionType.NIFI_NAR;
- final String content1 = "g1-a1-1.0.0";
- final BundleVersionCoordinate versionCoordinate =
getVersionCoordinate("b1", "g1", "a1", "1.0.0", type);
+ final String content1 = String.format("%s-%s-%s", GROUP_ID,
ARTIFACT_ID, FIRST_VERSION);
+ final BundleVersionCoordinate versionCoordinate =
getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION, type);
createBundleVersion(fileSystemBundleProvider, versionCoordinate,
content1);
verifyBundleVersion(bundleStorageDir, versionCoordinate, content1);
// try to save same bundle version that already exists
- try {
- final String newContent = "new content";
- createBundleVersion(fileSystemBundleProvider, versionCoordinate,
newContent);
- fail("Should have thrown exception");
- } catch (BundlePersistenceException e) {
- // expected
- }
+ final String newContent = "new content";
+ assertThrows(BundlePersistenceException.class, () ->
createBundleVersion(fileSystemBundleProvider, versionCoordinate, newContent));
// verify existing content wasn't modified
verifyBundleVersion(bundleStorageDir, versionCoordinate, content1);
@@ -126,8 +129,8 @@ public class TestFileSystemBundlePersistenceProvider {
public void testUpdateWhenBundleVersionAlreadyExists() throws IOException {
final BundleVersionType type = BundleVersionType.NIFI_NAR;
- final String content1 = "g1-a1-1.0.0";
- final BundleVersionCoordinate versionCoordinate =
getVersionCoordinate("b1", "g1", "a1", "1.0.0", type);
+ final String content1 = String.format("%s-%s-%s", GROUP_ID,
ARTIFACT_ID, FIRST_VERSION);
+ final BundleVersionCoordinate versionCoordinate =
getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION, type);
createBundleVersion(fileSystemBundleProvider, versionCoordinate,
content1);
verifyBundleVersion(bundleStorageDir, versionCoordinate, content1);
@@ -146,17 +149,14 @@ public class TestFileSystemBundlePersistenceProvider {
@Test
public void testCreateAndGet() throws IOException {
- final String bucketId = "b1";
- final String groupId = "g1";
- final String artifactId = "a1";
final BundleVersionType type = BundleVersionType.NIFI_NAR;
- final String content1 = groupId + "-" + artifactId + "-" + "1.0.0";
- final BundleVersionCoordinate versionCoordinate1 =
getVersionCoordinate(bucketId, groupId, artifactId, "1.0.0", type);
+ final String content1 = String.format("%s-%s-%s", GROUP_ID,
ARTIFACT_ID, FIRST_VERSION);
+ final BundleVersionCoordinate versionCoordinate1 =
getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION, type);
createBundleVersion(fileSystemBundleProvider,versionCoordinate1,
content1);
- final String content2 = groupId + "-" + artifactId + "-" + "1.1.0";
- final BundleVersionCoordinate versionCoordinate2 =
getVersionCoordinate(bucketId, groupId, artifactId, "1.1.0", type);
+ final String content2 = String.format("%s-%s-%s", GROUP_ID,
ARTIFACT_ID, SECOND_VERSION);
+ final BundleVersionCoordinate versionCoordinate2 =
getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, SECOND_VERSION, type);
createBundleVersion(fileSystemBundleProvider, versionCoordinate2,
content2);
try (final OutputStream out = new ByteArrayOutputStream()) {
@@ -176,30 +176,22 @@ public class TestFileSystemBundlePersistenceProvider {
@Test
public void testGetWhenDoesNotExist() throws IOException {
- final String bucketId = "b1";
- final String groupId = "g1";
- final String artifactId = "a1";
- final String version = "1.0.0";
final BundleVersionType type = BundleVersionType.NIFI_NAR;
try (final OutputStream out = new ByteArrayOutputStream()) {
- final BundleVersionCoordinate versionCoordinate =
getVersionCoordinate(bucketId, groupId, artifactId, version, type);
+ final BundleVersionCoordinate versionCoordinate =
getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION, type);
assertThrows(BundlePersistenceException.class, () ->
fileSystemBundleProvider.getBundleVersionContent(versionCoordinate, out));
}
}
@Test
public void testDeleteExtensionBundleVersion() throws IOException {
- final String bucketId = "b1";
- final String groupId = "g1";
- final String artifactId = "a1";
- final String version = "1.0.0";
final BundleVersionType bundleType = BundleVersionType.NIFI_NAR;
- final BundleVersionCoordinate versionCoordinate =
getVersionCoordinate(bucketId, groupId, artifactId, version, bundleType);
+ final BundleVersionCoordinate versionCoordinate =
getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION,
bundleType);
// create and verify the bundle version
- final String content1 = groupId + "-" + artifactId + "-" + version;
+ final String content1 = String.format("%s-%s-%s", GROUP_ID,
ARTIFACT_ID, FIRST_VERSION);
createBundleVersion(fileSystemBundleProvider, versionCoordinate,
content1);
verifyBundleVersion(bundleStorageDir, versionCoordinate, content1);
@@ -213,14 +205,10 @@ public class TestFileSystemBundlePersistenceProvider {
}
@Test
- public void testDeleteExtensionBundleVersionWhenDoesNotExist() throws
IOException {
- final String bucketId = "b1";
- final String groupId = "g1";
- final String artifactId = "a1";
- final String version = "1.0.0";
+ public void testDeleteExtensionBundleVersionWhenDoesNotExist() {
final BundleVersionType bundleType = BundleVersionType.NIFI_NAR;
- final BundleVersionCoordinate versionCoordinate =
getVersionCoordinate(bucketId, groupId, artifactId, version, bundleType);
+ final BundleVersionCoordinate versionCoordinate =
getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION,
bundleType);
// verify the bundle version does not already exist
final File bundleVersionDir =
FileSystemBundlePersistenceProvider.getBundleVersionDirectory(bundleStorageDir,
versionCoordinate);
@@ -233,39 +221,30 @@ public class TestFileSystemBundlePersistenceProvider {
@Test
public void testDeleteAllBundleVersions() throws IOException {
- final String bucketId = "b1";
- final String groupId = "g1";
- final String artifactId = "a1";
- final String version1 = "1.0.0";
- final String version2 = "2.0.0";
final BundleVersionType bundleType = BundleVersionType.NIFI_NAR;
// create and verify the bundle version 1
- final String content1 = groupId + "-" + artifactId + "-" + version1;
- final BundleVersionCoordinate versionCoordinate1 =
getVersionCoordinate(bucketId, groupId, artifactId, version1, bundleType);
+ final String content1 = String.format("%s-%s-%s", GROUP_ID,
ARTIFACT_ID, FIRST_VERSION);
+ final BundleVersionCoordinate versionCoordinate1 =
getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION,
bundleType);
createBundleVersion(fileSystemBundleProvider, versionCoordinate1,
content1);
verifyBundleVersion(bundleStorageDir, versionCoordinate1, content1);
// create and verify the bundle version 2
- final String content2 = groupId + "-" + artifactId + "-" + version2;
- final BundleVersionCoordinate versionCoordinate2 =
getVersionCoordinate(bucketId, groupId, artifactId, version2, bundleType);
+ final String content2 = String.format("%s-%s-%s", GROUP_ID,
ARTIFACT_ID, SECOND_VERSION);
+ final BundleVersionCoordinate versionCoordinate2 =
getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, SECOND_VERSION,
bundleType);
createBundleVersion(fileSystemBundleProvider, versionCoordinate2,
content2);
verifyBundleVersion(bundleStorageDir, versionCoordinate2, content2);
assertEquals(1, bundleStorageDir.listFiles().length);
- final BundleCoordinate bundleCoordinate =
getBundleCoordinate(bucketId, groupId, artifactId);
+ final BundleCoordinate bundleCoordinate = getBundleCoordinate();
fileSystemBundleProvider.deleteAllBundleVersions(bundleCoordinate);
assertEquals(0, bundleStorageDir.listFiles().length);
}
@Test
- public void testDeleteAllBundleVersionsWhenDoesNotExist() throws
IOException {
- final String bucketId = "b1";
- final String groupId = "g1";
- final String artifactId = "a1";
-
+ public void testDeleteAllBundleVersionsWhenDoesNotExist() {
assertEquals(0, bundleStorageDir.listFiles().length);
- final BundleCoordinate bundleCoordinate =
getBundleCoordinate(bucketId, groupId, artifactId);
+ final BundleCoordinate bundleCoordinate = getBundleCoordinate();
fileSystemBundleProvider.deleteAllBundleVersions(bundleCoordinate);
assertEquals(0, bundleStorageDir.listFiles().length);
}
@@ -306,11 +285,11 @@ public class TestFileSystemBundlePersistenceProvider {
return coordinate;
}
- private static BundleCoordinate getBundleCoordinate(final String bucketId,
final String groupId, final String artifactId) {
+ private static BundleCoordinate getBundleCoordinate() {
final BundleCoordinate coordinate =
Mockito.mock(BundleCoordinate.class);
- when(coordinate.getBucketId()).thenReturn(bucketId);
- when(coordinate.getGroupId()).thenReturn(groupId);
- when(coordinate.getArtifactId()).thenReturn(artifactId);
+ when(coordinate.getBucketId()).thenReturn(BUCKET_ID);
+ when(coordinate.getGroupId()).thenReturn(GROUP_ID);
+ when(coordinate.getArtifactId()).thenReturn(ARTIFACT_ID);
return coordinate;
}
diff --git
a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestFileSystemFlowPersistenceProvider.java
b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestFileSystemFlowPersistenceProvider.java
index c8f7c06548..d7e88ef9f1 100644
---
a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestFileSystemFlowPersistenceProvider.java
+++
b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestFileSystemFlowPersistenceProvider.java
@@ -36,21 +36,30 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.when;
public class TestFileSystemFlowPersistenceProvider {
+ private static final String BUCKET_ID =
"b0000000-0000-0000-0000-000000000000";
+
+ private static final String SECOND_BUCKET_ID =
"b2000000-0000-0000-0000-000000000000";
+
+ private static final String FLOW_ID =
"f0000000-0000-0000-0000-000000000000";
+
+ private static final String SECOND_FLOW_ID =
"f2000000-0000-0000-0000-000000000000";
+
+ private static final String FIRST_VERSION = "1.0.0";
+
+ private static final String SECOND_VERSION = "1.1.0";
+
static final String FLOW_STORAGE_DIR = "target/flow_storage";
- static final ProviderConfigurationContext CONFIGURATION_CONTEXT = new
ProviderConfigurationContext() {
- @Override
- public Map<String, String> getProperties() {
- final Map<String,String> props = new HashMap<>();
- props.put(FileSystemFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP,
FLOW_STORAGE_DIR);
- return props;
- }
+ static final ProviderConfigurationContext CONFIGURATION_CONTEXT = () -> {
+ final Map<String,String> props = new HashMap<>();
+ props.put(FileSystemFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP,
FLOW_STORAGE_DIR);
+ return props;
};
private File flowStorageDir;
@@ -61,7 +70,7 @@ public class TestFileSystemFlowPersistenceProvider {
flowStorageDir = new File(FLOW_STORAGE_DIR);
if (flowStorageDir.exists()) {
org.apache.commons.io.FileUtils.cleanDirectory(flowStorageDir);
- flowStorageDir.delete();
+ assertTrue(flowStorageDir.delete());
}
assertFalse(flowStorageDir.exists());
@@ -73,24 +82,18 @@ public class TestFileSystemFlowPersistenceProvider {
@Test
public void testSaveSuccessfully() throws IOException {
- createAndSaveSnapshot(fileSystemFlowProvider,"bucket1", "flow1", 1,
"flow1v1");
- verifySnapshot(flowStorageDir, "bucket1", "flow1", 1, "flow1v1");
+ createAndSaveSnapshot(fileSystemFlowProvider, 1, FIRST_VERSION);
+ verifySnapshot(flowStorageDir, 1, FIRST_VERSION);
- createAndSaveSnapshot(fileSystemFlowProvider,"bucket1", "flow1", 2,
"flow1v2");
- verifySnapshot(flowStorageDir, "bucket1", "flow1", 2, "flow1v2");
-
- createAndSaveSnapshot(fileSystemFlowProvider,"bucket1", "flow2", 1,
"flow2v1");
- verifySnapshot(flowStorageDir, "bucket1", "flow2", 1, "flow2v1");
-
- createAndSaveSnapshot(fileSystemFlowProvider,"bucket2", "flow3", 1,
"flow3v1");
- verifySnapshot(flowStorageDir, "bucket2", "flow3", 1, "flow3v1");
+ createAndSaveSnapshot(fileSystemFlowProvider, 2, SECOND_VERSION);
+ verifySnapshot(flowStorageDir, 2, SECOND_VERSION);
}
@Test
public void testSaveWithExistingVersion() throws IOException {
final FlowSnapshotContext context =
Mockito.mock(FlowSnapshotContext.class);
- when(context.getBucketId()).thenReturn("bucket1");
- when(context.getFlowId()).thenReturn("flow1");
+ when(context.getBucketId()).thenReturn(BUCKET_ID);
+ when(context.getFlowId()).thenReturn(FLOW_ID);
when(context.getVersion()).thenReturn(1);
final byte[] content = "flow1v1".getBytes(StandardCharsets.UTF_8);
@@ -98,108 +101,96 @@ public class TestFileSystemFlowPersistenceProvider {
// save new content for an existing version
final byte[] content2 = "XXX".getBytes(StandardCharsets.UTF_8);
- try {
- fileSystemFlowProvider.saveFlowContent(context, content2);
- fail("Should have thrown exception");
- } catch (Exception e) {
-
- }
+ assertThrows(Exception.class, () ->
fileSystemFlowProvider.saveFlowContent(context, content2));
// verify the new content wasn't written
- final File flowSnapshotFile = new File(flowStorageDir,
"bucket1/flow1/1/1" + FileSystemFlowPersistenceProvider.SNAPSHOT_EXTENSION);
+ final String path = String.format("%s/%s/1/1%s", BUCKET_ID, FLOW_ID,
FileSystemFlowPersistenceProvider.SNAPSHOT_EXTENSION);
+ final File flowSnapshotFile = new File(flowStorageDir, path);
try (InputStream in = new FileInputStream(flowSnapshotFile)) {
assertEquals("flow1v1", IOUtils.toString(in,
StandardCharsets.UTF_8));
}
}
@Test
- public void testSaveAndGet() throws IOException {
- createAndSaveSnapshot(fileSystemFlowProvider,"bucket1", "flow1", 1,
"flow1v1");
- createAndSaveSnapshot(fileSystemFlowProvider,"bucket1", "flow1", 2,
"flow1v2");
+ public void testSaveAndGet() {
+ createAndSaveSnapshot(fileSystemFlowProvider, 1, FIRST_VERSION);
+ createAndSaveSnapshot(fileSystemFlowProvider, 2, SECOND_VERSION);
- final byte[] flow1v1 =
fileSystemFlowProvider.getFlowContent("bucket1", "flow1", 1);
- assertEquals("flow1v1", new String(flow1v1, StandardCharsets.UTF_8));
+ final byte[] flow1v1 =
fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 1);
+ assertEquals(FIRST_VERSION, new String(flow1v1,
StandardCharsets.UTF_8));
- final byte[] flow1v2 =
fileSystemFlowProvider.getFlowContent("bucket1", "flow1", 2);
- assertEquals("flow1v2", new String(flow1v2, StandardCharsets.UTF_8));
+ final byte[] flow1v2 =
fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 2);
+ assertEquals(SECOND_VERSION, new String(flow1v2,
StandardCharsets.UTF_8));
}
@Test
public void testGetWhenDoesNotExist() {
- final byte[] flow1v1 =
fileSystemFlowProvider.getFlowContent("bucket1", "flow1", 1);
+ final byte[] flow1v1 =
fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 1);
assertNull(flow1v1);
}
@Test
- public void testDeleteSnapshots() throws IOException {
- final String bucketId = "bucket1";
- final String flowId = "flow1";
-
- createAndSaveSnapshot(fileSystemFlowProvider, bucketId, flowId, 1,
"flow1v1");
- createAndSaveSnapshot(fileSystemFlowProvider, bucketId, flowId, 2,
"flow1v2");
+ public void testDeleteSnapshots() {
+ createAndSaveSnapshot(fileSystemFlowProvider, 1, FIRST_VERSION);
+ createAndSaveSnapshot(fileSystemFlowProvider, 2, SECOND_VERSION);
- assertNotNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId,
1));
- assertNotNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId,
2));
+ assertNotNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID,
FLOW_ID, 1));
+ assertNotNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID,
FLOW_ID, 2));
- fileSystemFlowProvider.deleteAllFlowContent(bucketId, flowId);
+ fileSystemFlowProvider.deleteAllFlowContent(BUCKET_ID, FLOW_ID);
- assertNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 1));
- assertNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 2));
+ assertNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID,
1));
+ assertNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID,
2));
// delete a flow that doesn't exist
- fileSystemFlowProvider.deleteAllFlowContent(bucketId,
"some-other-flow");
+ fileSystemFlowProvider.deleteAllFlowContent(BUCKET_ID, SECOND_FLOW_ID);
// delete a bucket that doesn't exist
- fileSystemFlowProvider.deleteAllFlowContent("some-other-bucket",
flowId);
+ fileSystemFlowProvider.deleteAllFlowContent(SECOND_BUCKET_ID, FLOW_ID);
}
@Test
- public void testDeleteSnapshot() throws IOException {
- final String bucketId = "bucket1";
- final String flowId = "flow1";
-
- createAndSaveSnapshot(fileSystemFlowProvider, bucketId, flowId, 1,
"flow1v1");
- createAndSaveSnapshot(fileSystemFlowProvider, bucketId, flowId, 2,
"flow1v2");
+ public void testDeleteSnapshot() {
+ createAndSaveSnapshot(fileSystemFlowProvider, 1, FIRST_VERSION);
+ createAndSaveSnapshot(fileSystemFlowProvider, 2, SECOND_VERSION);
- assertNotNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId,
1));
- assertNotNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId,
2));
+ assertNotNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID,
FLOW_ID, 1));
+ assertNotNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID,
FLOW_ID, 2));
- fileSystemFlowProvider.deleteFlowContent(bucketId, flowId, 1);
+ fileSystemFlowProvider.deleteFlowContent(BUCKET_ID, FLOW_ID, 1);
- assertNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 1));
- assertNotNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId,
2));
+ assertNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID,
1));
+ assertNotNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID,
FLOW_ID, 2));
- fileSystemFlowProvider.deleteFlowContent(bucketId, flowId, 2);
+ fileSystemFlowProvider.deleteFlowContent(BUCKET_ID, FLOW_ID, 2);
- assertNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 1));
- assertNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 2));
+ assertNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID,
1));
+ assertNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID,
2));
// delete a version that doesn't exist
- fileSystemFlowProvider.deleteFlowContent(bucketId, flowId, 3);
+ fileSystemFlowProvider.deleteFlowContent(BUCKET_ID, FLOW_ID, 3);
// delete a flow that doesn't exist
- fileSystemFlowProvider.deleteFlowContent(bucketId, "some-other-flow",
1);
+ fileSystemFlowProvider.deleteFlowContent(BUCKET_ID, SECOND_FLOW_ID, 1);
// delete a bucket that doesn't exist
- fileSystemFlowProvider.deleteFlowContent("some-other-bucket", flowId,
1);
+ fileSystemFlowProvider.deleteFlowContent(SECOND_BUCKET_ID, FLOW_ID, 1);
}
- private void createAndSaveSnapshot(final FlowPersistenceProvider
flowPersistenceProvider, final String bucketId, final String flowId, final int
version,
- final String contentString) throws
IOException {
+ private void createAndSaveSnapshot(final FlowPersistenceProvider
flowPersistenceProvider, final int version, final String contentString) {
final FlowSnapshotContext context =
Mockito.mock(FlowSnapshotContext.class);
- when(context.getBucketId()).thenReturn(bucketId);
- when(context.getFlowId()).thenReturn(flowId);
+ when(context.getBucketId()).thenReturn(BUCKET_ID);
+ when(context.getFlowId()).thenReturn(FLOW_ID);
when(context.getVersion()).thenReturn(version);
final byte[] content = contentString.getBytes(StandardCharsets.UTF_8);
flowPersistenceProvider.saveFlowContent(context, content);
}
- private void verifySnapshot(final File flowStorageDir, final String
bucketId, final String flowId, final int version,
- final String contentString) throws IOException
{
+ private void verifySnapshot(final File flowStorageDir, final int version,
final String contentString) throws IOException {
// verify the correct snapshot file was created
final File flowSnapshotFile = new File(flowStorageDir,
- bucketId + "/" + flowId + "/" + version + "/" + version +
FileSystemFlowPersistenceProvider.SNAPSHOT_EXTENSION);
+ BUCKET_ID + "/" + FLOW_ID + "/" + version + "/" + version +
FileSystemFlowPersistenceProvider.SNAPSHOT_EXTENSION);
assertTrue(flowSnapshotFile.exists());
try (InputStream in = new FileInputStream(flowSnapshotFile)) {
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java
index 5636424b29..e7ec904e75 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java
@@ -46,12 +46,25 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
+import java.util.UUID;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
+ private static final String TEST_FLOWS_BUCKET = "test-flows";
+
+ private static final Set<String> FLOW_IDS = Set.of(
+ "first-flow",
+ "flow-with-invalid-connection",
+ "port-moved-groups",
+ "Parent",
+ "Child"
+ );
+
private final ObjectMapper objectMapper = new ObjectMapper();
{
@@ -77,14 +90,16 @@ public class FileSystemFlowRegistryClient extends
AbstractFlowRegistryClient {
@Override
public boolean isStorageLocationApplicable(final
FlowRegistryClientConfigurationContext context, final String storageLocation) {
try {
- final File file = new java.io.File(URI.create(storageLocation));
- final Path path = file.toPath();
-
- final String configuredDirectory =
context.getProperty(DIRECTORY).getValue();
- final Path rootPath = Paths.get(configuredDirectory);
-
- // If this doesn't throw an Exception, the given storageLocation
is relative to the root path
- rootPath.relativize(path);
+ final Path rootPath =
getRootDirectory(context).toPath().normalize();
+ final URI location = URI.create(storageLocation);
+ final Path storageLocationPath =
Paths.get(location.getPath()).normalize();
+
+ if (storageLocationPath.startsWith(rootPath)) {
+ // If this doesn't throw an Exception, the given
storageLocation is relative to the root path
+
Objects.requireNonNull(rootPath.relativize(storageLocationPath));
+ } else {
+ return false;
+ }
} catch (final Exception e) {
return false;
}
@@ -100,8 +115,7 @@ public class FileSystemFlowRegistryClient extends
AbstractFlowRegistryClient {
throw new IOException("Cannot get listing of directory " +
rootDir.getAbsolutePath());
}
- final Set<FlowRegistryBucket> buckets =
Arrays.stream(children).map(this::toBucket).collect(Collectors.toSet());
- return buckets;
+ return
Arrays.stream(children).map(this::toBucket).collect(Collectors.toSet());
}
private FlowRegistryBucket toBucket(final File file) {
@@ -130,17 +144,14 @@ public class FileSystemFlowRegistryClient extends
AbstractFlowRegistryClient {
@Override
public FlowRegistryBucket getBucket(final
FlowRegistryClientConfigurationContext context, final String bucketId) {
final File rootDir = getRootDirectory(context);
- final File bucketDir = new File(rootDir, bucketId);
- final FlowRegistryBucket bucket = toBucket(bucketDir);
- return bucket;
+ final File bucketDir = getChildLocation(rootDir,
getValidatedBucketPath(bucketId));
+ return toBucket(bucketDir);
}
@Override
public RegisteredFlow registerFlow(final
FlowRegistryClientConfigurationContext context, final RegisteredFlow flow)
throws IOException {
- final File rootDir = getRootDirectory(context);
final String bucketId = flow.getBucketIdentifier();
- final File bucketDir = new File(rootDir, bucketId);
- final File flowDir = new File(bucketDir, flow.getIdentifier());
+ final File flowDir = getFlowDirectory(context, bucketId,
flow.getIdentifier());
Files.createDirectories(flowDir.toPath());
return flow;
@@ -148,9 +159,7 @@ public class FileSystemFlowRegistryClient extends
AbstractFlowRegistryClient {
@Override
public RegisteredFlow deregisterFlow(final
FlowRegistryClientConfigurationContext context, final String bucketId, final
String flowId) throws IOException {
- final File rootDir = getRootDirectory(context);
- final File bucketDir = new File(rootDir, bucketId);
- final File flowDir = new File(bucketDir, flowId);
+ final File flowDir = getFlowDirectory(context, bucketId, flowId);
final File[] versionDirs = flowDir.listFiles();
@@ -167,9 +176,7 @@ public class FileSystemFlowRegistryClient extends
AbstractFlowRegistryClient {
@Override
public RegisteredFlow getFlow(final FlowRegistryClientConfigurationContext
context, final String bucketId, final String flowId) {
- final File rootDir = getRootDirectory(context);
- final File bucketDir = new File(rootDir, bucketId);
- final File flowDir = new File(bucketDir, flowId);
+ final File flowDir = getFlowDirectory(context, bucketId, flowId);
final File[] versionDirs = flowDir.listFiles();
@@ -186,7 +193,7 @@ public class FileSystemFlowRegistryClient extends
AbstractFlowRegistryClient {
@Override
public Set<RegisteredFlow> getFlows(final
FlowRegistryClientConfigurationContext context, final String bucketId) throws
IOException {
final File rootDir = getRootDirectory(context);
- final File bucketDir = new File(rootDir, bucketId);
+ final File bucketDir = getChildLocation(rootDir,
getValidatedBucketPath(bucketId));
final File[] flowDirs = bucketDir.listFiles();
if (flowDirs == null) {
throw new IOException("Could not get listing of directory " +
bucketDir);
@@ -203,16 +210,12 @@ public class FileSystemFlowRegistryClient extends
AbstractFlowRegistryClient {
@Override
public RegisteredFlowSnapshot getFlowContents(final
FlowRegistryClientConfigurationContext context, final String bucketId, final
String flowId, final int version) throws IOException {
- final File rootDir = getRootDirectory(context);
- final File bucketDir = new File(rootDir, bucketId);
- final File flowDir = new File(bucketDir, flowId);
- final File versionDir = new File(flowDir, String.valueOf(version));
- final File snapshotFile = new File(versionDir, "snapshot.json");
-
+ final File flowDir = getFlowDirectory(context, bucketId, flowId);
final Pattern intPattern = Pattern.compile("\\d+");
final File[] versionFiles = flowDir.listFiles(file ->
intPattern.matcher(file.getName()).matches());
final JsonFactory factory = new JsonFactory(objectMapper);
+ final File snapshotFile = getSnapshotFile(context, bucketId, flowId,
version);
try (final JsonParser parser = factory.createParser(snapshotFile)) {
final RegisteredFlowSnapshot snapshot =
parser.readValueAs(RegisteredFlowSnapshot.class);
populateBucket(snapshot, bucketId);
@@ -264,22 +267,19 @@ public class FileSystemFlowRegistryClient extends
AbstractFlowRegistryClient {
@Override
public RegisteredFlowSnapshot registerFlowSnapshot(final
FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot
flowSnapshot) throws IOException {
- final File rootDir = getRootDirectory(context);
final RegisteredFlowSnapshotMetadata metadata =
flowSnapshot.getSnapshotMetadata();
final String bucketId = metadata.getBucketIdentifier();
final String flowId = metadata.getFlowIdentifier();
+ final File flowDir = getFlowDirectory(context, bucketId, flowId);
final long version = metadata.getVersion();
-
- final File bucketDir = new File(rootDir, bucketId);
- final File flowDir = new File(bucketDir, flowId);
- final File versionDir = new File(flowDir, String.valueOf(version));
+ final File versionDir = getChildLocation(flowDir,
Paths.get(String.valueOf(version)));
// Create the directory for the version, if it doesn't exist.
if (!versionDir.exists()) {
Files.createDirectories(versionDir.toPath());
}
- final File snapshotFile = new File(versionDir, "snapshot.json");
+ final File snapshotFile = getSnapshotFile(context, bucketId, flowId,
version);
final RegisteredFlowSnapshot fullyPopulated =
fullyPopulate(flowSnapshot, flowDir);
final JsonFactory factory = new JsonFactory(objectMapper);
@@ -329,7 +329,7 @@ public class FileSystemFlowRegistryClient extends
AbstractFlowRegistryClient {
flow.setPermissions(createAllowAllPermissions());
final File[] flowVersionDirs = flowDir.listFiles();
- final int versionCount = flowVersionDirs == null ? 0 :
flowVersionDirs.length;;
+ final int versionCount = flowVersionDirs == null ? 0 :
flowVersionDirs.length;
flow.setVersionCount(versionCount);
final RegisteredFlowVersionInfo versionInfo = new
RegisteredFlowVersionInfo();
@@ -353,9 +353,7 @@ public class FileSystemFlowRegistryClient extends
AbstractFlowRegistryClient {
@Override
public Set<RegisteredFlowSnapshotMetadata> getFlowVersions(final
FlowRegistryClientConfigurationContext context, final String bucketId, final
String flowId) throws IOException {
- final File rootDir = getRootDirectory(context);
- final File bucketDir = new File(rootDir, bucketId);
- final File flowDir = new File(bucketDir, flowId);
+ final File flowDir = getFlowDirectory(context, bucketId, flowId);
final File[] versionDirs = flowDir.listFiles();
if (versionDirs == null) {
throw new IOException("Could not list directories of " + flowDir);
@@ -379,9 +377,7 @@ public class FileSystemFlowRegistryClient extends
AbstractFlowRegistryClient {
@Override
public int getLatestVersion(final FlowRegistryClientConfigurationContext
context, final String bucketId, final String flowId) throws IOException {
- final File rootDir = getRootDirectory(context);
- final File bucketDir = new File(rootDir, bucketId);
- final File flowDir = new File(bucketDir, flowId);
+ final File flowDir = getFlowDirectory(context, bucketId, flowId);
final File[] versionDirs = flowDir.listFiles();
if (versionDirs == null) {
throw new IOException("Cannot list directories of " + flowDir);
@@ -393,4 +389,46 @@ public class FileSystemFlowRegistryClient extends
AbstractFlowRegistryClient {
.max();
return greatestValue.orElse(-1);
}
+
+ private File getSnapshotFile(final FlowRegistryClientConfigurationContext
context, final String bucketId, final String flowId, final long version) {
+ final File flowDirectory = getFlowDirectory(context, bucketId, flowId);
+ final File versionDirectory = getChildLocation(flowDirectory,
Paths.get(String.valueOf(version)));
+ return new File(versionDirectory, "snapshot.json");
+ }
+
+ private File getFlowDirectory(final FlowRegistryClientConfigurationContext
context, final String bucketId, final String flowId) {
+ final File rootDir = getRootDirectory(context);
+ final File bucketDir = getChildLocation(rootDir,
getValidatedBucketPath(bucketId));
+ return getChildLocation(bucketDir, getFlowPath(flowId));
+ }
+
+ private File getChildLocation(final File parentDir, final Path
childLocation) {
+ final Path parentPath = parentDir.toPath().normalize();
+ final Path childPath = parentPath.resolve(childLocation.normalize());
+ if (childPath.startsWith(parentPath)) {
+ return childPath.toFile();
+ }
+ throw new IllegalArgumentException(String.format("Child location not
valid [%s]", childLocation));
+ }
+
+ private Path getFlowPath(final String flowId) {
+ final Optional<String> flowIdFound = FLOW_IDS.stream().filter(id ->
id.equals(flowId)).findFirst();
+ if (flowIdFound.isPresent()) {
+ return Paths.get(flowIdFound.get());
+ }
+
+ try {
+ final UUID flowIdentifier = UUID.fromString(flowId);
+ return Paths.get(flowIdentifier.toString());
+ } catch (final RuntimeException e) {
+ throw new IllegalArgumentException(String.format("Flow ID [%s] not
validated", flowId));
+ }
+ }
+
+ private Path getValidatedBucketPath(final String id) {
+ if (TEST_FLOWS_BUCKET.equals(id)) {
+ return Paths.get(TEST_FLOWS_BUCKET);
+ }
+ throw new IllegalArgumentException(String.format("Bucket [%s] not
validated", id));
+ }
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
index 13394efdda..36511c6f06 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
@@ -40,8 +40,6 @@ import org.apache.nifi.web.api.entity.SnippetEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -59,7 +57,9 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RegistryClientIT extends NiFiSystemIT {
- private static final Logger logger =
LoggerFactory.getLogger(RegistryClientIT.class);
+ private static final String TEST_FLOWS_BUCKET = "test-flows";
+
+ private static final String FIRST_FLOW_ID = "first-flow";
/**
* Test a scenario where we have Parent Process Group with a child process
group. The child group is under Version Control.
@@ -89,8 +89,8 @@ public class RegistryClientIT extends NiFiSystemIT {
final ProcessorEntity terminate =
util.createProcessor("TerminateFlowFile", parent.getId());
final ConnectionEntity connectionToTerminate =
util.createConnection(outputPort, terminate);
- final VersionControlInformationEntity childVci =
util.startVersionControl(child, clientEntity,
"testChangeVersionOnParentThatCascadesToChild", "Child");
- final VersionControlInformationEntity parentVci =
util.startVersionControl(parent, clientEntity,
"testChangeVersionOnParentThatCascadesToChild", "Parent");
+ final VersionControlInformationEntity childVci =
util.startVersionControl(child, clientEntity, TEST_FLOWS_BUCKET, "Child");
+ final VersionControlInformationEntity parentVci =
util.startVersionControl(parent, clientEntity, TEST_FLOWS_BUCKET, "Parent");
// Change the properties of the UpdateContent processor and commit as
v2
util.updateProcessorProperties(updateContents,
Collections.singletonMap("Content", "Updated v2"));
@@ -160,7 +160,7 @@ public class RegistryClientIT extends NiFiSystemIT {
final ConnectionEntity generateToCount =
util.createConnection(generate, countProcessor, "success");
// Save the flow as v1
- final VersionControlInformationEntity v1Vci =
util.startVersionControl(parent, clientEntity,
"testChangeConnectionDestinationRemoveOldAndMoveGroup", "Parent");
+ final VersionControlInformationEntity v1Vci =
util.startVersionControl(parent, clientEntity, TEST_FLOWS_BUCKET, "Parent");
// Create a Terminate processor and change flow to be:
// Generate -> Terminate - remove the old Count Processor
@@ -215,7 +215,7 @@ public class RegistryClientIT extends NiFiSystemIT {
util.createConnection(generate, countProcessor, "success");
// Save the flow as v1
- final VersionControlInformationEntity vci =
util.startVersionControl(group, clientEntity,
"testControllerServiceUpdateWhileRunning", "Parent");
+ final VersionControlInformationEntity vci =
util.startVersionControl(group, clientEntity, TEST_FLOWS_BUCKET, "Parent");
// Change the value of of the Controller Service's start value to
2000, and change the text of the GenerateFlowFile just to make it run each time
the version is changed
util.updateControllerServiceProperties(service,
Collections.singletonMap("Start Value", "2000"));
@@ -259,7 +259,7 @@ public class RegistryClientIT extends NiFiSystemIT {
public void testChangeVersionWithPortMoveBetweenGroups() throws
NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient(new
File("src/test/resources/versioned-flows"));
- final ProcessGroupEntity imported =
getClientUtil().importFlowFromRegistry("root", clientEntity.getId(),
"test-flows", "port-moved-groups", 1);
+ final ProcessGroupEntity imported =
getClientUtil().importFlowFromRegistry("root", clientEntity.getId(),
TEST_FLOWS_BUCKET, "port-moved-groups", 1);
assertNotNull(imported);
getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
@@ -306,7 +306,7 @@ public class RegistryClientIT extends NiFiSystemIT {
public void testRollbackOnFailure() throws NiFiClientException,
IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient(new
File("src/test/resources/versioned-flows"));
- final ProcessGroupEntity imported =
getClientUtil().importFlowFromRegistry("root", clientEntity.getId(),
"test-flows", "flow-with-invalid-connection", 1);
+ final ProcessGroupEntity imported =
getClientUtil().importFlowFromRegistry("root", clientEntity.getId(),
TEST_FLOWS_BUCKET, "flow-with-invalid-connection", 1);
assertNotNull(imported);
getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
@@ -329,7 +329,7 @@ public class RegistryClientIT extends NiFiSystemIT {
final ProcessGroupEntity group =
getClientUtil().createProcessGroup("Outer", "root");
final ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile", group.getId());
- final VersionControlInformationEntity vci =
getClientUtil().startVersionControl(group, clientEntity, "First Bucket", "First
Flow");
+ final VersionControlInformationEntity vci =
getClientUtil().startVersionControl(group, clientEntity, TEST_FLOWS_BUCKET,
FIRST_FLOW_ID);
final ProcessGroupEntity imported =
getClientUtil().importFlowFromRegistry("root",
vci.getVersionControlInformation());
assertNotNull(imported);
@@ -352,7 +352,7 @@ public class RegistryClientIT extends NiFiSystemIT {
final ProcessGroupEntity group =
getClientUtil().createProcessGroup("Outer", "root");
final ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile", group.getId());
- getClientUtil().startVersionControl(group, clientEntity, "First
Bucket", "First Flow");
+ getClientUtil().startVersionControl(group, clientEntity,
TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
String versionedFlowState =
getClientUtil().getVersionedFlowState(group.getId(), "root");
assertEquals("UP_TO_DATE", versionedFlowState);
@@ -379,13 +379,13 @@ public class RegistryClientIT extends NiFiSystemIT {
// Create a top-level PG and version it with nothing in it.
final FlowRegistryClientEntity clientEntity = registerClient();
final ProcessGroupEntity outerGroup =
getClientUtil().createProcessGroup("Outer", "root");
- getClientUtil().startVersionControl(outerGroup, clientEntity, "First
Bucket", "First Flow");
+ getClientUtil().startVersionControl(outerGroup, clientEntity,
TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
// Create a lower level PG and add a Processor.
// Commit as Version 2 of the group.
final ProcessGroupEntity inner1 =
getClientUtil().createProcessGroup("Inner 1", outerGroup.getId());
ProcessorEntity terminate1 =
getClientUtil().createProcessor("TerminateFlowFile", inner1.getId());
- VersionControlInformationEntity vciEntity =
getClientUtil().startVersionControl(outerGroup, clientEntity, "First Bucket",
"First Flow");
+ VersionControlInformationEntity vciEntity =
getClientUtil().startVersionControl(outerGroup, clientEntity,
TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
assertEquals(2, vciEntity.getVersionControlInformation().getVersion());
// Get an up-to-date copy of terminate1 because it should now have a
non-null versioned component id
@@ -407,7 +407,7 @@ public class RegistryClientIT extends NiFiSystemIT {
assertNotEquals(terminate1.getComponent().getVersionedComponentId(),
terminate2.getComponent().getVersionedComponentId());
// First Control again with the newly created components
- vciEntity = getClientUtil().startVersionControl(outerGroup,
clientEntity, "First Bucket", "First Flow");
+ vciEntity = getClientUtil().startVersionControl(outerGroup,
clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
assertEquals(3, vciEntity.getVersionControlInformation().getVersion());
// Get new version of terminate2 processor and terminate1 processor.
Ensure that both have version control ID's but that they are different.
@@ -429,7 +429,7 @@ public class RegistryClientIT extends NiFiSystemIT {
// Commit as Version 2 of the group.
final ProcessGroupEntity innerGroup =
getClientUtil().createProcessGroup("Inner 1", topLevel1.getId());
ProcessorEntity terminate1 =
getClientUtil().createProcessor("TerminateFlowFile", innerGroup.getId());
- VersionControlInformationEntity vciEntity =
getClientUtil().startVersionControl(innerGroup, clientEntity, "First Bucket",
"First Flow");
+ VersionControlInformationEntity vciEntity =
getClientUtil().startVersionControl(innerGroup, clientEntity,
TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
assertEquals(1, vciEntity.getVersionControlInformation().getVersion());
// Now that the inner group is under version control, copy it and
paste it to a new PG.
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java
index 58add50ed9..5c6e0be50a 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java
@@ -740,7 +740,7 @@ public class StatelessBasicsIT extends NiFiSystemIT {
final FlowRegistryClientEntity registryClient = registerClient();
// Register the first version of the flow
- final VersionControlInformationEntity vci =
getClientUtil().startVersionControl(statelessGroup, registryClient, "First
Bucket", "testChangeFlowVersion");
+ final VersionControlInformationEntity vci =
getClientUtil().startVersionControl(statelessGroup, registryClient,
"test-flows", "first-flow");
waitFor(() ->
VersionControlInformationDTO.UP_TO_DATE.equals(getClientUtil().getVersionControlState(statelessGroup.getId()))
);
// Update the flow