Repository: nifi
Updated Branches:
  refs/heads/master 68057cb4a -> 273e69f2c


NIFI-3274 Adding WriteAheadLog configuration options to 
WriteAheadLogLocalStateProvider

This closes #1386.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/273e69f2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/273e69f2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/273e69f2

Branch: refs/heads/master
Commit: 273e69f2cb833fd8cdcc99f70b8ab23fcc16d5f0
Parents: 68057cb
Author: jpercivall <[email protected]>
Authored: Tue Jan 3 18:13:54 2017 -0500
Committer: Pierre Villard <[email protected]>
Committed: Thu Jan 5 19:59:58 2017 +0100

----------------------------------------------------------------------
 .../local/WriteAheadLocalStateProvider.java     | 55 +++++++++++++++++---
 .../local/TestWriteAheadLocalStateProvider.java |  3 ++
 .../main/resources/conf/state-management.xml    |  8 +++
 3 files changed, 58 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/273e69f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
index fc691fb..e341c4c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
@@ -54,6 +54,8 @@ import org.wali.WriteAheadRepository;
 public class WriteAheadLocalStateProvider extends AbstractStateProvider {
     private static final Logger logger = 
LoggerFactory.getLogger(WriteAheadLocalStateProvider.class);
 
+    private volatile boolean alwaysSync;
+
     private final StateMapSerDe serde;
     private final ConcurrentMap<String, ComponentProvider> componentProviders 
= new ConcurrentHashMap<>();
     private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory());
@@ -66,6 +68,33 @@ public class WriteAheadLocalStateProvider extends 
AbstractStateProvider {
         .required(true)
         .build();
 
+    static final PropertyDescriptor ALWAYS_SYNC = new 
PropertyDescriptor.Builder()
+        .name("Always Sync")
+        .description("If set to true, any change to the repository will be 
synchronized to the disk, meaning that NiFi will ask the operating system not 
to cache the information. This is very " +
+                "expensive and can significantly reduce NiFi performance. 
However, if it is false, there could be the potential for data loss if either 
there is a sudden power loss or the " +
+                "operating system crashes. The default value is false.")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor NUM_PARTITIONS = new 
PropertyDescriptor.Builder()
+        .name("Partitions")
+        .description("The number of partitions.")
+        .addValidator(StandardValidators.createLongValidator(1, 
Integer.MAX_VALUE, true))
+        .defaultValue("16")
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor CHECKPOINT_INTERVAL = new 
PropertyDescriptor.Builder()
+        .name("Checkpoint Interval")
+        .description("The amount of time between checkpoints.")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .defaultValue("2 mins")
+        .required(true)
+        .build();
+
+
     private WriteAheadRepository<StateMapUpdate> writeAheadLog;
     private AtomicLong versionGenerator;
 
@@ -75,6 +104,11 @@ public class WriteAheadLocalStateProvider extends 
AbstractStateProvider {
 
     @Override
     public synchronized void init(final StateProviderInitializationContext 
context) throws IOException {
+        long checkpointIntervalMillis = 
context.getProperty(CHECKPOINT_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+        int numPartitions = context.getProperty(NUM_PARTITIONS).asInteger();
+        alwaysSync = context.getProperty(ALWAYS_SYNC).asBoolean();
+
+
         final File basePath = new File(context.getProperty(PATH).getValue());
 
         if (!basePath.exists() && !basePath.mkdirs()) {
@@ -94,7 +128,7 @@ public class WriteAheadLocalStateProvider extends 
AbstractStateProvider {
         }
 
         versionGenerator = new AtomicLong(-1L);
-        writeAheadLog = new MinimalLockingWriteAheadLog<>(basePath.toPath(), 
16, serde, null);
+        writeAheadLog = new MinimalLockingWriteAheadLog<>(basePath.toPath(), 
numPartitions, serde, null);
 
         final Collection<StateMapUpdate> updates = 
writeAheadLog.recoverRecords();
         long maxRecordVersion = -1L;
@@ -110,7 +144,7 @@ public class WriteAheadLocalStateProvider extends 
AbstractStateProvider {
             }
 
             final String componentId = update.getComponentId();
-            componentProviders.put(componentId, new 
ComponentProvider(writeAheadLog, versionGenerator, componentId, 
update.getStateMap()));
+            componentProviders.put(componentId, new 
ComponentProvider(writeAheadLog, versionGenerator, componentId, 
update.getStateMap(), alwaysSync));
         }
 
         // keep a separate maxRecordVersion and set it at the end so that we 
don't have to continually update an AtomicLong, which is more
@@ -118,13 +152,16 @@ public class WriteAheadLocalStateProvider extends 
AbstractStateProvider {
         // the init() method completes, this is okay to do.
         versionGenerator.set(maxRecordVersion);
 
-        executor.scheduleWithFixedDelay(new CheckpointTask(), 2, 2, 
TimeUnit.MINUTES);
+        executor.scheduleWithFixedDelay(new CheckpointTask(), 
checkpointIntervalMillis, checkpointIntervalMillis, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(PATH);
+        properties.add(ALWAYS_SYNC);
+        properties.add(CHECKPOINT_INTERVAL);
+        properties.add(NUM_PARTITIONS);
         return properties;
     }
 
@@ -144,7 +181,7 @@ public class WriteAheadLocalStateProvider extends 
AbstractStateProvider {
         ComponentProvider componentProvider = 
componentProviders.get(componentId);
         if (componentProvider == null) {
             final StateMap stateMap = new 
StandardStateMap(Collections.<String, String> emptyMap(), -1L);
-            componentProvider = new ComponentProvider(writeAheadLog, 
versionGenerator, componentId, stateMap);
+            componentProvider = new ComponentProvider(writeAheadLog, 
versionGenerator, componentId, stateMap, alwaysSync);
 
             final ComponentProvider existingComponentProvider = 
componentProviders.putIfAbsent(componentId, componentProvider);
             if (existingComponentProvider != null) {
@@ -190,14 +227,16 @@ public class WriteAheadLocalStateProvider extends 
AbstractStateProvider {
         private final AtomicLong versionGenerator;
         private final WriteAheadRepository<StateMapUpdate> wal;
         private final String componentId;
+        private final boolean alwaysSync;
 
         private StateMap stateMap;
 
-        public ComponentProvider(final WriteAheadRepository<StateMapUpdate> 
wal, final AtomicLong versionGenerator, final String componentId, final 
StateMap stateMap) {
+        public ComponentProvider(final WriteAheadRepository<StateMapUpdate> 
wal, final AtomicLong versionGenerator, final String componentId, final 
StateMap stateMap, final boolean alwaysSync) {
             this.wal = wal;
             this.versionGenerator = versionGenerator;
             this.componentId = componentId;
             this.stateMap = stateMap;
+            this.alwaysSync = alwaysSync;
         }
 
         public synchronized StateMap getState() throws IOException {
@@ -211,7 +250,7 @@ public class WriteAheadLocalStateProvider extends 
AbstractStateProvider {
         public synchronized void setState(final Map<String, String> state) 
throws IOException {
             stateMap = new StandardStateMap(state, 
versionGenerator.incrementAndGet());
             final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, 
componentId, UpdateType.UPDATE);
-            wal.update(Collections.singleton(updateRecord), false);
+            wal.update(Collections.singleton(updateRecord), alwaysSync);
         }
 
         // see above explanation as to why this method is synchronized.
@@ -227,14 +266,14 @@ public class WriteAheadLocalStateProvider extends 
AbstractStateProvider {
 
             stateMap = new StandardStateMap(new HashMap<>(newValue), 
versionGenerator.incrementAndGet());
             final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, 
componentId, UpdateType.UPDATE);
-            wal.update(Collections.singleton(updateRecord), false);
+            wal.update(Collections.singleton(updateRecord), alwaysSync);
             return true;
         }
 
         public synchronized void clear() throws IOException {
             stateMap = new StandardStateMap(null, 
versionGenerator.incrementAndGet());
             final StateMapUpdate update = new StateMapUpdate(stateMap, 
componentId, UpdateType.UPDATE);
-            wal.update(Collections.singleton(update), false);
+            wal.update(Collections.singleton(update), alwaysSync);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/273e69f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
index 3a6310f..ac0b030 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
@@ -45,6 +45,9 @@ public class TestWriteAheadLocalStateProvider extends 
AbstractTestStateProvider
         provider = new WriteAheadLocalStateProvider();
         final Map<PropertyDescriptor, PropertyValue> properties = new 
HashMap<>();
         properties.put(WriteAheadLocalStateProvider.PATH, new 
StandardPropertyValue("target/local-state-provider/" + 
UUID.randomUUID().toString(), null));
+        properties.put(WriteAheadLocalStateProvider.ALWAYS_SYNC, new 
StandardPropertyValue("false", null));
+        properties.put(WriteAheadLocalStateProvider.CHECKPOINT_INTERVAL, new 
StandardPropertyValue("2 mins", null));
+        properties.put(WriteAheadLocalStateProvider.NUM_PARTITIONS, new 
StandardPropertyValue("16", null));
 
         provider.initialize(new StateProviderInitializationContext() {
             @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/273e69f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
index d4a13cf..d7631c2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
@@ -25,11 +25,19 @@
         
         Directory - the directory to store components' state in. If the 
directory being used is a sub-directory of the NiFi installation, it
                     is important that the directory be copied over to the new 
version when upgrading NiFi.
+        Always Sync - If set to true, any change to the repository will be 
synchronized to the disk, meaning that NiFi will ask the operating system not 
to cache the information. This is very
+                expensive and can significantly reduce NiFi performance. 
However, if it is false, there could be the potential for data loss if either 
there is a sudden power loss or the
+                operating system crashes. The default value is false.
+        Partitions - The number of partitions.
+        Checkpoint Interval - The amount of time between checkpoints.
      -->
     <local-provider>
         <id>local-provider</id>
         
<class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
         <property name="Directory">./state/local</property>
+        <property name="Always Sync">false</property>
+        <property name="Partitions">16</property>
+        <property name="Checkpoint Interval">2 mins</property>
     </local-provider>
 
     <!--

Reply via email to