Merge branch 'develop' into NIFI-744
Conflicts:
nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fe25ae09
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fe25ae09
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fe25ae09
Branch: refs/heads/NIFI-744
Commit: fe25ae093f09d569c9a34fb4374c4b98c0734aa6
Parents: 05d4b06 15d4902
Author: Mark Payne <[email protected]>
Authored: Tue Aug 4 09:41:39 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Tue Aug 4 09:41:39 2015 -0400
----------------------------------------------------------------------
.../authorization/DownloadAuthorization.java | 2 +-
nifi/nifi-bootstrap/pom.xml | 37 +-
.../nifi-hl7-query-language/pom.xml | 6 +-
.../org/wali/MinimalLockingWriteAheadLog.java | 15 +-
.../src/test/java/org/wali/DummyRecord.java | 5 +
.../test/java/org/wali/DummyRecordSerde.java | 8 +
.../wali/TestMinimalLockingWriteAheadLog.java | 215 +++++++
nifi/nifi-docs/pom.xml | 19 +-
.../nifi-aws-bundle/nifi-aws-processors/pom.xml | 4 -
.../nifi-flume-bundle/nifi-flume-nar/pom.xml | 3 +-
.../nifi-flume-processors/pom.xml | 32 +-
nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml | 29 +-
.../repository/StandardProcessSession.java | 5 +-
.../repository/io/LimitedInputStream.java | 7 +-
.../repository/io/TestLimitedInputStream.java | 122 ++++
.../repository/io/TestLimitedOutputStream.java | 76 ---
.../nifi-framework/nifi-site-to-site/pom.xml | 2 +-
.../src/main/webapp/WEB-INF/web.xml | 11 +-
.../src/main/webapp/js/nf/nf-common.js | 2 +-
.../nifi-geo-bundle/nifi-geo-nar/pom.xml | 2 +-
.../nifi-hdfs-processors/pom.xml | 4 +-
.../nifi-kafka-processors/pom.xml | 1 -
.../PersistentProvenanceRepository.java | 170 ++++--
.../nifi/provenance/StandardRecordReader.java | 8 +-
.../nifi/provenance/StandardRecordWriter.java | 24 +-
.../provenance/serialization/RecordWriter.java | 12 +-
.../TestPersistentProvenanceRepository.java | 120 +++-
.../nifi-twitter-processors/pom.xml | 2 -
.../nifi-standard-processors/pom.xml | 553 +++++++++----------
.../nifi-dbcp-service/pom.xml | 28 +-
.../nifi-http-context-map/pom.xml | 4 +-
.../nifi-standard-services-api-nar/pom.xml | 11 +-
32 files changed, 1017 insertions(+), 522 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe25ae09/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe25ae09/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --cc
nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 4408e3d,a1063f0..4d02d18
---
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@@ -1068,29 -1082,24 +1089,36 @@@ public class PersistentProvenanceReposi
final int journalCountThreshold = configuration.getJournalCount()
* 5;
final long sizeThreshold = (long)
(configuration.getMaxStorageCapacity() * 1.1D); // do not go over 10% of max
capacity
+ // check if we need to apply backpressure.
+ // If we have too many journal files, or if the repo becomes too
large, backpressure is necessary. Without it,
+ // if the rate at which provenance events are registered exceeds
the rate at which we can compress/merge/index them,
+ // then eventually we will end up with all of the data stored in
the 'journals' directory and not yet indexed. This
+ // would mean that the data would never even be accessible. In
order to prevent this, if we exceeds 110% of the configured
+ // max capacity for the repo, or if we have 5 sets of journal
files waiting to be merged, we will block here until
+ // that is no longer the case.
if (journalFileCount > journalCountThreshold || repoSize >
sizeThreshold) {
logger.warn("The rate of the dataflow is exceeding the
provenance recording rate. "
- + "Slowing down flow to accomodate. Currently, there
are {} journal files ({} bytes) and "
+ + "Slowing down flow to accommodate. Currently, there
are {} journal files ({} bytes) and "
+ "threshold for blocking is {} ({} bytes)",
journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
eventReporter.reportEvent(Severity.WARNING, "Provenance
Repository", "The rate of the dataflow is "
- + "exceeding the provenance recording rate. Slowing
down flow to accomodate");
+ + "exceeding the provenance recording rate. Slowing
down flow to accommodate");
while (journalFileCount > journalCountThreshold || repoSize >
sizeThreshold) {
- try {
- Thread.sleep(1000L);
- } catch (final InterruptedException ie) {
+ if (repoSize > sizeThreshold) {
+ logger.debug("Provenance Repository has exceeded its
size threshold; will trigger purging of oldest events");
+ purgeOldEvents();
+
+ journalFileCount = getJournalCount();
+ repoSize = getSize(getLogFiles(), 0L);
+ continue;
+ } else {
+ // if we are constrained by the number of journal
files rather than the size of the repo,
+ // then we will just sleep a bit because another
thread is already actively merging the journals,
+ // due to the runnable that we scheduled above
+ try {
+ Thread.sleep(100L);
+ } catch (final InterruptedException ie) {
+ }
}
logger.debug("Provenance Repository is still behind.
Keeping flow slowed down "
@@@ -1428,25 -1414,13 +1480,32 @@@
recordToReaderMap.put(nextRecord, reader);
}
}
++
+ indexWriter.commit();
+ } catch (final Throwable t) {
+ indexWriter.rollback();
+ throw t;
+ } finally {
+ finishedAdding.set(true);
+ exec.shutdown();
+ }
+
+ for (final Future<?> future : futures) {
+ try {
+ future.get();
+ } catch (final ExecutionException ee) {
+ final Throwable t = ee.getCause();
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+
+ throw new RuntimeException(t);
+ } catch (final InterruptedException e) {
+ throw new RuntimeException("Thread interrupted");
+ }
}
+
+ indexConfig.setMaxIdIndexed(maxId);
} finally {
indexManager.returnIndexWriter(indexingDirectory,
indexWriter);
}