This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 58a25cf NIFI-6110: Updated StandardProcessSession such that if we
fail to update the FlowFile Repository, we do not decrement claimant counts for
any FlowFiles that were removed. Doing so can cause an issue where a FlowFile
is removed, then the FlowFileRepo update fails, resulting in the flowfile being
rolled back, but after its claimant count is decremented. It will then be
processed again, which can result in the same thing, and we'll end up
decrementing the claimant count rep [...]
58a25cf is described below
commit 58a25cfa5a9b9e4af9a5e9b7e5c3be9a676d8e87
Author: Mark Payne <[email protected]>
AuthorDate: Fri Mar 8 08:55:53 2019 -0500
NIFI-6110: Updated StandardProcessSession such that if we fail to update
the FlowFile Repository, we do not decrement claimant counts for any FlowFiles
that were removed. Doing so can cause an issue where a FlowFile is removed,
then the FlowFileRepo update fails, resulting in the flowfile being rolled
back, but after its claimant count is decremented. It will then be processed
again, which can result in the same thing, and we'll end up decrementing the
claimant count repeatedly. Also [...]
This closes #3358.
Signed-off-by: Bryan Bende <[email protected]>
---
.../apache/nifi/wali/LengthDelimitedJournal.java | 8 +-
.../nifi/wali/TestLengthDelimitedJournal.java | 93 +++++++++++++++++++---
.../src/test/java/org/wali/DummyRecordSerde.java | 14 ++--
.../repository/StandardProcessSession.java | 38 ++++-----
.../repository/TestStandardProcessSession.java | 55 +++++++++++++
5 files changed, 170 insertions(+), 38 deletions(-)
diff --git
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
index d9fdc97..df7868d 100644
---
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
+++
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
@@ -40,6 +40,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.text.DecimalFormat;
import java.util.Collection;
import java.util.HashMap;
@@ -218,6 +219,11 @@ public class LengthDelimitedJournal<T> implements
WriteAheadJournal<T> {
}
+ // Visible/overrideable for testing.
+ protected void createOverflowDirectory(final Path path) throws IOException
{
+ Files.createDirectories(path);
+ }
+
@Override
public void update(final Collection<T> records, final RecordLookup<T>
recordLookup) throws IOException {
if (!headerWritten) {
@@ -246,7 +252,7 @@ public class LengthDelimitedJournal<T> implements
WriteAheadJournal<T> {
final int size = bados.getByteArrayOutputStream().size();
if (serde.isWriteExternalFileReferenceSupported() && size
> maxInHeapSerializationBytes) {
if (!overflowDirectory.exists()) {
- Files.createDirectory(overflowDirectory.toPath());
+
createOverflowDirectory(overflowDirectory.toPath());
}
// If we have exceeded our threshold for how much to
serialize in memory,
diff --git
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
index 94df890..448654e 100644
---
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
+++
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
@@ -17,16 +17,21 @@
package org.apache.nifi.wali;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.wali.DummyRecord;
+import org.wali.DummyRecordSerde;
+import org.wali.SerDeFactory;
+import org.wali.SingletonSerDeFactory;
+import org.wali.UpdateType;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -36,15 +41,14 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.wali.DummyRecord;
-import org.wali.DummyRecordSerde;
-import org.wali.SerDeFactory;
-import org.wali.SingletonSerDeFactory;
-import org.wali.UpdateType;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class TestLengthDelimitedJournal {
private final File journalFile = new
File("target/testLengthDelimitedJournal/testJournal.journal");
@@ -308,6 +312,71 @@ public class TestLengthDelimitedJournal {
}
@Test
+ public void testMultipleThreadsCreatingOverflowDirectory() throws
IOException, InterruptedException {
+ final LengthDelimitedJournal<DummyRecord> journal = new
LengthDelimitedJournal<DummyRecord>(journalFile, serdeFactory, streamPool,
3820L, 100) {
+ @Override
+ protected void createOverflowDirectory(final Path path) throws
IOException {
+ // Create the overflow directory.
+ super.createOverflowDirectory(path);
+
+ // Ensure that a second call to create the overflow directory
will not cause an issue.
+ super.createOverflowDirectory(path);
+ }
+ };
+
+ // Ensure that the overflow directory does not exist.
+ journal.dispose();
+
+ try {
+ journal.writeHeader();
+
+ final List<DummyRecord> largeCollection1 = new ArrayList<>();
+ for (int i=0; i < 1_000; i++) {
+ largeCollection1.add(new DummyRecord(String.valueOf(i),
UpdateType.CREATE));
+ }
+ final Map<String, DummyRecord> recordMap =
largeCollection1.stream()
+ .collect(Collectors.toMap(DummyRecord::getId, rec -> rec));
+
+ final List<DummyRecord> largeCollection2 = new ArrayList<>();
+ for (int i=0; i < 1_000; i++) {
+ largeCollection2.add(new DummyRecord(String.valueOf(5_000_000
+ i), UpdateType.CREATE));
+ }
+ final Map<String, DummyRecord> recordMap2 =
largeCollection2.stream()
+ .collect(Collectors.toMap(DummyRecord::getId, rec -> rec));
+
+ final AtomicReference<Exception> thread1Failure = new
AtomicReference<>();
+ final Thread t1 = new Thread(() -> {
+ try {
+ journal.update(largeCollection1, recordMap::get);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ thread1Failure.set(e);
+ }
+ });
+ t1.start();
+
+ final AtomicReference<Exception> thread2Failure = new
AtomicReference<>();
+ final Thread t2 = new Thread(() -> {
+ try {
+ journal.update(largeCollection2, recordMap2::get);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ thread2Failure.set(e);
+ }
+ });
+ t2.start();
+
+ t1.join();
+ t2.join();
+
+ assertNull(thread1Failure.get());
+ assertNull(thread2Failure.get());
+ } finally {
+ journal.close();
+ }
+ }
+
+ @Test
public void testTruncatedJournalFile() throws IOException {
final DummyRecord firstRecord, secondRecord;
try (final LengthDelimitedJournal<DummyRecord> journal = new
LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
diff --git
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
index 9203493..cc8a4c1 100644
---
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
+++
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
@@ -43,7 +43,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
@SuppressWarnings("fallthrough")
@Override
- public void serializeEdit(final DummyRecord previousState, final
DummyRecord record, final DataOutputStream out) throws IOException {
+ public synchronized void serializeEdit(final DummyRecord previousState,
final DummyRecord record, final DataOutputStream out) throws IOException {
if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >=
throwIOEAfterNserializeEdits)) {
throw new IOException("Serialized " + (serializeEditCount - 1) + "
records successfully, so now it's time to throw IOE");
}
@@ -80,13 +80,13 @@ public class DummyRecordSerde implements SerDe<DummyRecord>
{
}
@Override
- public void serializeRecord(final DummyRecord record, final
DataOutputStream out) throws IOException {
+ public synchronized void serializeRecord(final DummyRecord record, final
DataOutputStream out) throws IOException {
serializeEdit(null, record, out);
}
@Override
@SuppressWarnings("fallthrough")
- public DummyRecord deserializeRecord(final DataInputStream in, final int
version) throws IOException {
+ public synchronized DummyRecord deserializeRecord(final DataInputStream
in, final int version) throws IOException {
if (externalRecords != null) {
final DummyRecord record = externalRecords.poll();
if (record != null) {
@@ -122,7 +122,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord>
{
}
@Override
- public boolean isMoreInExternalFile() {
+ public synchronized boolean isMoreInExternalFile() {
return externalRecords != null && !externalRecords.isEmpty();
}
@@ -189,11 +189,11 @@ public class DummyRecordSerde implements
SerDe<DummyRecord> {
return 1;
}
- public void setThrowIOEAfterNSerializeEdits(final int n) {
+ public synchronized void setThrowIOEAfterNSerializeEdits(final int n) {
this.throwIOEAfterNserializeEdits = n;
}
- public void setThrowOOMEAfterNSerializeEdits(final int n) {
+ public synchronized void setThrowOOMEAfterNSerializeEdits(final int n) {
this.throwOOMEAfterNserializeEdits = n;
}
@@ -208,7 +208,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord>
{
}
@Override
- public void writeExternalFileReference(final File externalFile, final
DataOutputStream out) throws IOException {
+ public synchronized void writeExternalFileReference(final File
externalFile, final DataOutputStream out) throws IOException {
out.write(EXTERNAL_FILE_INDICATOR);
out.writeUTF(externalFile.getAbsolutePath());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 604eb7e..7a87678 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -359,8 +359,25 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
final long updateProvenanceStart = System.nanoTime();
updateProvenanceRepo(checkpoint);
- final long claimRemovalStart = System.nanoTime();
- final long updateProvenanceNanos = claimRemovalStart -
updateProvenanceStart;
+ final long flowFileRepoUpdateStart = System.nanoTime();
+ final long updateProvenanceNanos = flowFileRepoUpdateStart -
updateProvenanceStart;
+
+ // Update the FlowFile Repository
+ try {
+ final Collection<StandardRepositoryRecord> repoRecords =
checkpoint.records.values();
+ context.getFlowFileRepository().updateRepository((Collection)
repoRecords);
+ } catch (final IOException ioe) {
+ // if we fail to commit the session, we need to roll back
+ // the checkpoints as well because none of the checkpoints
+ // were ever committed.
+ rollback(false, true);
+ throw new ProcessException("FlowFile Repository failed to
update", ioe);
+ }
+
+ final long flowFileRepoUpdateFinishNanos = System.nanoTime();
+ final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos
- flowFileRepoUpdateStart;
+
+ final long claimRemovalStart = flowFileRepoUpdateFinishNanos;
/**
* Figure out which content claims can be released. At this point,
@@ -401,25 +418,10 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
final long claimRemovalFinishNanos = System.nanoTime();
final long claimRemovalNanos = claimRemovalFinishNanos -
claimRemovalStart;
- // Update the FlowFile Repository
- try {
- final Collection<StandardRepositoryRecord> repoRecords =
checkpoint.records.values();
- context.getFlowFileRepository().updateRepository((Collection)
repoRecords);
- } catch (final IOException ioe) {
- // if we fail to commit the session, we need to roll back
- // the checkpoints as well because none of the checkpoints
- // were ever committed.
- rollback(false, true);
- throw new ProcessException("FlowFile Repository failed to
update", ioe);
- }
-
- final long flowFileRepoUpdateFinishNanos = System.nanoTime();
- final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos
- claimRemovalFinishNanos;
-
updateEventRepository(checkpoint);
final long updateEventRepositoryFinishNanos = System.nanoTime();
- final long updateEventRepositoryNanos =
updateEventRepositoryFinishNanos - flowFileRepoUpdateFinishNanos;
+ final long updateEventRepositoryNanos =
updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
// transfer the flowfiles to the connections' queues.
final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap =
new HashMap<>();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index d21cc5c..b001e79 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -92,6 +92,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.notNull;
@@ -325,6 +326,60 @@ public class TestStandardProcessSession {
@Test
+ public void testUpdateFlowFileRepoFailsOnSessionCommit() throws
IOException {
+ final ContentClaim contentClaim =
contentRepo.create("original".getBytes());
+
+ final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
+ .id(1000L)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(8L)
+ .contentClaim(contentClaim)
+ .build();
+
+ flowFileQueue.put(flowFileRecord);
+
+ final Relationship relationship = new
Relationship.Builder().name("A").build();
+
+ FlowFile ff1 = session.get();
+ assertNotNull(ff1);
+
+ // Fork a child FlowFile.
+ final FlowFile child = session.create(flowFileRecord);
+ final FlowFile updated = session.write(flowFileRecord, out ->
out.write("update".getBytes()));
+ final ContentClaim updatedContentClaim = ((FlowFileRecord)
updated).getContentClaim();
+
+ session.remove(updated);
+ final FlowFile updatedChild = session.write(child, out ->
out.write("hello".getBytes(StandardCharsets.UTF_8)));
+ session.transfer(updatedChild, relationship);
+
+ final ContentClaim childContentClaim = ((FlowFileRecord)
updatedChild).getContentClaim();
+
+ flowFileRepo.setFailOnUpdate(true);
+
+ assertEquals(1, contentRepo.getClaimantCount(contentClaim));
+
+ // these will be the same content claim due to how the
StandardProcessSession adds multiple FlowFiles' contents to a single claim.
+ assertSame(updatedContentClaim, childContentClaim);
+ assertEquals(2, contentRepo.getClaimantCount(childContentClaim));
+
+ try {
+ session.commit();
+ Assert.fail("Expected session commit to fail");
+ } catch (final ProcessException pe) {
+ // Expected
+ }
+
+ // Ensure that if we fail to update teh flowfile repo, that the
claimant count of the 'original' flowfile, which was removed, does not get
decremented.
+ assertEquals(1, contentRepo.getClaimantCount(contentClaim));
+ assertEquals(0, contentRepo.getClaimantCount(updatedContentClaim)); //
temporary claim should be cleaned up.
+ assertEquals(0, contentRepo.getClaimantCount(childContentClaim)); //
temporary claim should be cleaned up.
+
+ assertEquals(1, flowFileQueue.size().getObjectCount());
+ assertEquals(8L, flowFileQueue.size().getByteCount());
+ }
+
+ @Test
public void testCloneOriginalDataSmaller() throws IOException {
final byte[] originalContent = "hello".getBytes();
final byte[] replacementContent = "NEW DATA".getBytes();