Repository: nifi Updated Branches: refs/heads/master ba8f17bac -> b950eed1a
NIFI-4439: This closes #2190. When a Provenance Event File is rolled over, we were failing to close the resource before attempting to compress it. Fixed that. NIFI-4439: Addressed threading bug that can occur when rolling over provenance record writer Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b950eed1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b950eed1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b950eed1 Branch: refs/heads/master Commit: b950eed1a5a3c7c604ecc6c46edc397fda597cfe Parents: ba8f17b Author: Mark Payne <[email protected]> Authored: Tue Oct 3 09:44:54 2017 -0400 Committer: joewitt <[email protected]> Committed: Mon Oct 16 17:18:01 2017 -0400 ---------------------------------------------------------------------- .../store/WriteAheadStorePartition.java | 59 +++++++++++--------- 1 file changed, 34 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b950eed1/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java index fde76f5..6c5cc8d 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java @@ -253,38 +253,47 @@ public class WriteAheadStorePartition implements EventStorePartition { final long nextEventId = idGenerator.get(); final File updatedEventFile = new File(partitionDirectory, nextEventId + ".prov"); final RecordWriter updatedWriter = recordWriterFactory.createWriter(updatedEventFile, idGenerator, false, true); - final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount()); - final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease); - if (updated) { - updatedWriter.writeHeader(nextEventId); + // Synchronize on the writer to ensure that no other thread is able to obtain the writer and start writing events to it until after it has + // been fully initialized (i.e., the header has been written, etc.) + synchronized (updatedWriter) { + final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount()); + final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease); - synchronized (minEventIdToPathMap) { - minEventIdToPathMap.put(nextEventId, updatedEventFile); - } + if (updated) { + if (lease != null) { + lease.close(); + } - if (config.isCompressOnRollover() && lease != null && lease.getWriter() != null) { - boolean offered = false; - while (!offered && !closed) { - try { - offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS); - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression"); + updatedWriter.writeHeader(nextEventId); + + synchronized (minEventIdToPathMap) { + minEventIdToPathMap.put(nextEventId, updatedEventFile); + } + + if (config.isCompressOnRollover() && lease != null && lease.getWriter() != null) { + boolean offered = false; + while (!offered && !closed) { + try { + offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression"); + } } } - } - return true; - } else { - try { - updatedWriter.close(); - } catch (final Exception e) { - logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", updatedWriter, e); - } + return true; + } else { + try { + updatedWriter.close(); + } catch (final Exception e) { + logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", updatedWriter, e); + } - updatedEventFile.delete(); - return false; + updatedEventFile.delete(); + return false; + } } }
