This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1c2469bf33 NIFI-13213 Added Validation for Swap File Names This closes
8812
1c2469bf33 is described below
commit 1c2469bf33ffac1554dd418aadbfb1c7f2e98357
Author: exceptionfactory <[email protected]>
AuthorDate: Thu May 9 16:20:56 2024 -0500
NIFI-13213 Added Validation for Swap File Names
This closes 8812
Signed-off-by: Joseph Witt <[email protected]>
---
.../nifi/controller/FileSystemSwapManager.java | 24 ++++++++++--
.../nifi/controller/TestFileSystemSwapManager.java | 45 +++++++++++++++-------
2 files changed, 52 insertions(+), 17 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index b01bdc9f79..2b56cedb7b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -65,6 +65,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -79,6 +80,7 @@ public class FileSystemSwapManager implements
FlowFileSwapManager {
private static final Pattern SWAP_FILE_PATTERN =
Pattern.compile("\\d+-.+?(\\..*?)?\\.swap");
private static final Pattern TEMP_SWAP_FILE_PATTERN =
Pattern.compile("\\d+-.+?(\\..*?)?\\.swap\\.part");
+ private static final Pattern UUID_PATTERN =
Pattern.compile("([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})");
public static final String EVENT_CATEGORY = "Swap FlowFiles";
private static final Logger logger =
LoggerFactory.getLogger(FileSystemSwapManager.class);
@@ -133,11 +135,11 @@ public class FileSystemSwapManager implements
FlowFileSwapManager {
return null;
}
- final String swapFilePrefix = System.currentTimeMillis() + "-" +
flowFileQueue.getIdentifier() + "-" + UUID.randomUUID().toString();
- final String swapFileBaseName = partitionName == null ? swapFilePrefix
: swapFilePrefix + "." + partitionName;
- final String swapFileName = swapFileBaseName + ".swap";
+ final String swapFileName =
getSwapFileName(flowFileQueue.getIdentifier(), partitionName);
+ final Path storageDirectoryPath = storageDirectory.toPath();
+ final Path swapFilePath =
storageDirectoryPath.resolve(swapFileName).toAbsolutePath();
- final File swapFile = new File(storageDirectory, swapFileName);
+ final File swapFile = swapFilePath.toFile();
final File swapTempFile = new File(swapFile.getParentFile(),
swapFile.getName() + ".part");
final String swapLocation = swapFile.getAbsolutePath();
@@ -482,4 +484,18 @@ public class FileSystemSwapManager implements
FlowFileSwapManager {
logger.debug("Changed Partition for Swap File by renaming from {} to
{}", swapLocation, newPartitionName);
return newFile.getAbsolutePath();
}
+
+ private String getSwapFileName(final String flowFileQueueIdentifier, final
String partitionName) {
+ final UUID identifier;
+ final Matcher identifierMatcher =
UUID_PATTERN.matcher(flowFileQueueIdentifier);
+ if (identifierMatcher.find()) {
+ identifier = UUID.fromString(identifierMatcher.group(1));
+ } else {
+ throw new IllegalArgumentException("FlowFile Queue Identifier [%s]
not valid".formatted(flowFileQueueIdentifier));
+ }
+
+ final String swapFilePrefix = System.currentTimeMillis() + "-" +
identifier + "-" + UUID.randomUUID();
+ final String swapFileBaseName = partitionName == null ? swapFilePrefix
: swapFilePrefix + "." + partitionName;
+ return swapFileBaseName + ".swap";
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index a00b2c01f8..cc91b405ae 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -27,7 +27,6 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.stream.io.StreamUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.mockito.Mockito;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
@@ -41,24 +40,44 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestFileSystemSwapManager {
+ @Test
+ public void testFlowFileQueueIdentifierNotValid() {
+ final String identifier = "invalid-identifier";
+
+ final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
+ when(flowFileQueue.getIdentifier()).thenReturn(identifier);
+ final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
+ final FileSystemSwapManager swapManager =
createSwapManager(flowFileRepo);
+ final List<FlowFileRecord> flowFileRecords =
Collections.singletonList(new MockFlowFileRecord(0));
+
+ final IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class,
+ () -> swapManager.swapOut(flowFileRecords, flowFileQueue,
"partition-1"));
+
+ assertTrue(exception.getMessage().contains(identifier));
+ }
+
@Test
public void testBackwardCompatible() throws IOException {
- try (final InputStream fis = new FileInputStream(new
File("src/test/resources/old-swap-file.swap"));
- final DataInputStream in = new DataInputStream(new
BufferedInputStream(fis))) {
+ try (final InputStream fis = new
FileInputStream("src/test/resources/old-swap-file.swap");
+ final DataInputStream in = new DataInputStream(new
BufferedInputStream(fis))) {
- final FlowFileQueue flowFileQueue =
Mockito.mock(FlowFileQueue.class);
+ final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
final FileSystemSwapManager swapManager = createSwapManager();
@@ -76,11 +95,11 @@ public class TestFileSystemSwapManager {
@Test
public void testFailureOnRepoSwapOut() throws IOException {
- final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
- final FlowFileRepository flowFileRepo =
Mockito.mock(FlowFileRepository.class);
- Mockito.doThrow(new IOException("Intentional IOException for unit
test"))
+ final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
+ doThrow(new IOException("Intentional IOException for unit test"))
.when(flowFileRepo).swapFlowFilesOut(any(), any(), any());
final FileSystemSwapManager swapManager =
createSwapManager(flowFileRepo);
@@ -96,7 +115,7 @@ public class TestFileSystemSwapManager {
@Test
public void testSwapFileUnknownToRepoNotSwappedIn() throws IOException {
- final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn("");
final File targetDir = new File("target/swap");
@@ -111,7 +130,7 @@ public class TestFileSystemSwapManager {
final FileSystemSwapManager swapManager = new
FileSystemSwapManager(Paths.get("target"));
final ResourceClaimManager resourceClaimManager = new
NopResourceClaimManager();
- final FlowFileRepository flowFileRepo =
Mockito.mock(FlowFileRepository.class);
+ final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
swapManager.initialize(new SwapManagerInitializationContext() {
@Override
@@ -134,7 +153,7 @@ public class TestFileSystemSwapManager {
final List<String> recoveredLocations =
swapManager.recoverSwapLocations(flowFileQueue, null);
assertEquals(1, recoveredLocations.size());
- final String firstLocation = recoveredLocations.get(0);
+ final String firstLocation = recoveredLocations.getFirst();
final SwapContents emptyContents = swapManager.swapIn(firstLocation,
flowFileQueue);
assertEquals(0, emptyContents.getFlowFiles().size());
@@ -144,8 +163,8 @@ public class TestFileSystemSwapManager {
assertEquals(10000, contents.getFlowFiles().size());
}
- private FileSystemSwapManager createSwapManager() throws IOException {
- final FlowFileRepository flowFileRepo =
Mockito.mock(FlowFileRepository.class);
+ private FileSystemSwapManager createSwapManager() {
+ final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
return createSwapManager(flowFileRepo);
}
@@ -175,7 +194,7 @@ public class TestFileSystemSwapManager {
return swapManager;
}
- public class NopResourceClaimManager implements ResourceClaimManager {
+ public static class NopResourceClaimManager implements
ResourceClaimManager {
@Override
public ResourceClaim newResourceClaim(String container, String
section, String id, boolean lossTolerant, boolean writable) {
return null;