Repository: nifi
Updated Branches:
  refs/heads/master 310347fd6 -> 4baffacc4


NIFI-892: If nifi.flowfile.repository.partitions property is changed, but 
repository already exists, just previous value


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4baffacc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4baffacc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4baffacc

Branch: refs/heads/master
Commit: 4baffacc42d1201e4cbdbcb1601bfa810a2e9267
Parents: 310347f
Author: Mark Payne <[email protected]>
Authored: Mon Aug 24 16:30:30 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Tue Aug 25 09:58:37 2015 -0400

----------------------------------------------------------------------
 .../org/wali/MinimalLockingWriteAheadLog.java   |  6 ++--
 .../wali/TestMinimalLockingWriteAheadLog.java   | 34 ++++++++++++++++++++
 2 files changed, 38 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4baffacc/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index a5889ed..5a7656d 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -130,6 +130,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
             throw new IllegalArgumentException("Paths must be non-empty");
         }
 
+        int resolvedPartitionCount = partitionCount;
         int existingPartitions = 0;
         for (final Path path : paths) {
             if (!Files.exists(path)) {
@@ -162,6 +163,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
                     logger.warn("Constructing MinimalLockingWriteAheadLog with 
partitionCount={}, but the repository currently has "
                             + "{} partitions; ignoring argument and proceeding 
with {} partitions",
                             new Object[]{partitionCount, existingPartitions, 
existingPartitions});
+                    resolvedPartitionCount = existingPartitions;
                 }
             }
         }
@@ -175,10 +177,10 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         lockChannel = new FileOutputStream(lockPath.toFile()).getChannel();
         lockChannel.lock();
 
-        partitions = new Partition[partitionCount];
+        partitions = new Partition[resolvedPartitionCount];
 
         Iterator<Path> pathIterator = paths.iterator();
-        for (int i = 0; i < partitionCount; i++) {
+        for (int i = 0; i < resolvedPartitionCount; i++) {
             // If we're out of paths, create a new iterator to start over.
             if (!pathIterator.hasNext()) {
                 pathIterator = paths.iterator();

http://git-wip-us.apache.org/repos/asf/nifi/blob/4baffacc/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
index 29d2e7f..03e6581 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
@@ -417,6 +417,40 @@ public class TestMinimalLockingWriteAheadLog {
 
     }
 
+
+    @Test
+    public void testDecreaseNumberOfPartitions() throws IOException {
+        final Path path = 
Paths.get("target/minimal-locking-repo-decrease-partitions");
+        deleteRecursively(path.toFile());
+        Files.createDirectories(path);
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final WriteAheadRepository<DummyRecord> writeRepo = new 
MinimalLockingWriteAheadLog<>(path, 256, serde, null);
+        final Collection<DummyRecord> initialRecs = writeRepo.recoverRecords();
+        assertTrue(initialRecs.isEmpty());
+
+        final DummyRecord record1 = new DummyRecord("1", UpdateType.CREATE);
+        writeRepo.update(Collections.singleton(record1), false);
+
+        for (int i=0; i < 8; i++) {
+            final DummyRecord r = new DummyRecord("1", UpdateType.UPDATE);
+            r.setProperty("i", String.valueOf(i));
+            writeRepo.update(Collections.singleton(r), false);
+        }
+
+        writeRepo.shutdown();
+
+        final WriteAheadRepository<DummyRecord> recoverRepo = new 
MinimalLockingWriteAheadLog<>(path, 6, serde, null);
+        final Collection<DummyRecord> records = recoverRepo.recoverRecords();
+        final List<DummyRecord> list = new ArrayList<>(records);
+        assertEquals(1, list.size());
+
+        final DummyRecord recoveredRecord = list.get(0);
+        assertEquals("1", recoveredRecord.getId());
+        assertEquals("7",recoveredRecord.getProperty("i"));
+    }
+
+
     private static class InsertThread extends Thread {
 
         private final List<List<DummyRecord>> records;

Reply via email to