Repository: nifi Updated Branches: refs/heads/master 68057cb4a -> 273e69f2c
NIFI-3274 Adding WriteAheadLog configuration options to WriteAheadLogLocalStateProvider This closes #1386. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/273e69f2 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/273e69f2 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/273e69f2 Branch: refs/heads/master Commit: 273e69f2cb833fd8cdcc99f70b8ab23fcc16d5f0 Parents: 68057cb Author: jpercivall <[email protected]> Authored: Tue Jan 3 18:13:54 2017 -0500 Committer: Pierre Villard <[email protected]> Committed: Thu Jan 5 19:59:58 2017 +0100 ---------------------------------------------------------------------- .../local/WriteAheadLocalStateProvider.java | 55 +++++++++++++++++--- .../local/TestWriteAheadLocalStateProvider.java | 3 ++ .../main/resources/conf/state-management.xml | 8 +++ 3 files changed, 58 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/273e69f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java index fc691fb..e341c4c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java @@ -54,6 +54,8 @@ import org.wali.WriteAheadRepository; public class WriteAheadLocalStateProvider extends AbstractStateProvider { private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class); + private volatile boolean alwaysSync; + private final StateMapSerDe serde; private final ConcurrentMap<String, ComponentProvider> componentProviders = new ConcurrentHashMap<>(); private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory()); @@ -66,6 +68,33 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider { .required(true) .build(); + static final PropertyDescriptor ALWAYS_SYNC = new PropertyDescriptor.Builder() + .name("Always Sync") + .description("If set to true, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very " + + "expensive and can significantly reduce NiFi performance. However, if it is false, there could be the potential for data loss if either there is a sudden power loss or the " + + "operating system crashes. The default value is false.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder() + .name("Partitions") + .description("The number of partitions.") + .addValidator(StandardValidators.createLongValidator(1, Integer.MAX_VALUE, true)) + .defaultValue("16") + .required(true) + .build(); + + static final PropertyDescriptor CHECKPOINT_INTERVAL = new PropertyDescriptor.Builder() + .name("Checkpoint Interval") + .description("The amount of time between checkpoints.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("2 mins") + .required(true) + .build(); + + private WriteAheadRepository<StateMapUpdate> writeAheadLog; private AtomicLong versionGenerator; @@ -75,6 +104,11 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider { @Override public synchronized void init(final StateProviderInitializationContext context) throws IOException { + long checkpointIntervalMillis = context.getProperty(CHECKPOINT_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS); + int numPartitions = context.getProperty(NUM_PARTITIONS).asInteger(); + alwaysSync = context.getProperty(ALWAYS_SYNC).asBoolean(); + + final File basePath = new File(context.getProperty(PATH).getValue()); if (!basePath.exists() && !basePath.mkdirs()) { @@ -94,7 +128,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider { } versionGenerator = new AtomicLong(-1L); - writeAheadLog = new MinimalLockingWriteAheadLog<>(basePath.toPath(), 16, serde, null); + writeAheadLog = new MinimalLockingWriteAheadLog<>(basePath.toPath(), numPartitions, serde, null); final Collection<StateMapUpdate> updates = writeAheadLog.recoverRecords(); long maxRecordVersion = -1L; @@ -110,7 +144,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider { } final String componentId = update.getComponentId(); - componentProviders.put(componentId, new ComponentProvider(writeAheadLog, versionGenerator, componentId, update.getStateMap())); + componentProviders.put(componentId, new ComponentProvider(writeAheadLog, versionGenerator, componentId, update.getStateMap(), alwaysSync)); } // keep a separate maxRecordVersion and set it at the end so that we don't have to continually update an AtomicLong, which is more @@ -118,13 +152,16 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider { // the init() method completes, this is okay to do. versionGenerator.set(maxRecordVersion); - executor.scheduleWithFixedDelay(new CheckpointTask(), 2, 2, TimeUnit.MINUTES); + executor.scheduleWithFixedDelay(new CheckpointTask(), checkpointIntervalMillis, checkpointIntervalMillis, TimeUnit.MILLISECONDS); } @Override public List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(PATH); + properties.add(ALWAYS_SYNC); + properties.add(CHECKPOINT_INTERVAL); + properties.add(NUM_PARTITIONS); return properties; } @@ -144,7 +181,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider { ComponentProvider componentProvider = componentProviders.get(componentId); if (componentProvider == null) { final StateMap stateMap = new StandardStateMap(Collections.<String, String> emptyMap(), -1L); - componentProvider = new ComponentProvider(writeAheadLog, versionGenerator, componentId, stateMap); + componentProvider = new ComponentProvider(writeAheadLog, versionGenerator, componentId, stateMap, alwaysSync); final ComponentProvider existingComponentProvider = componentProviders.putIfAbsent(componentId, componentProvider); if (existingComponentProvider != null) { @@ -190,14 +227,16 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider { private final AtomicLong versionGenerator; private final WriteAheadRepository<StateMapUpdate> wal; private final String componentId; + private final boolean alwaysSync; private StateMap stateMap; - public ComponentProvider(final WriteAheadRepository<StateMapUpdate> wal, final AtomicLong versionGenerator, final String componentId, final StateMap stateMap) { + public ComponentProvider(final WriteAheadRepository<StateMapUpdate> wal, final AtomicLong versionGenerator, final String componentId, final StateMap stateMap, final boolean alwaysSync) { this.wal = wal; this.versionGenerator = versionGenerator; this.componentId = componentId; this.stateMap = stateMap; + this.alwaysSync = alwaysSync; } public synchronized StateMap getState() throws IOException { @@ -211,7 +250,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider { public synchronized void setState(final Map<String, String> state) throws IOException { stateMap = new StandardStateMap(state, versionGenerator.incrementAndGet()); final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE); - wal.update(Collections.singleton(updateRecord), false); + wal.update(Collections.singleton(updateRecord), alwaysSync); } // see above explanation as to why this method is synchronized. @@ -227,14 +266,14 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider { stateMap = new StandardStateMap(new HashMap<>(newValue), versionGenerator.incrementAndGet()); final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE); - wal.update(Collections.singleton(updateRecord), false); + wal.update(Collections.singleton(updateRecord), alwaysSync); return true; } public synchronized void clear() throws IOException { stateMap = new StandardStateMap(null, versionGenerator.incrementAndGet()); final StateMapUpdate update = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE); - wal.update(Collections.singleton(update), false); + wal.update(Collections.singleton(update), alwaysSync); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/273e69f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java index 3a6310f..ac0b030 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java @@ -45,6 +45,9 @@ public class TestWriteAheadLocalStateProvider extends AbstractTestStateProvider provider = new WriteAheadLocalStateProvider(); final Map<PropertyDescriptor, PropertyValue> properties = new HashMap<>(); properties.put(WriteAheadLocalStateProvider.PATH, new StandardPropertyValue("target/local-state-provider/" + UUID.randomUUID().toString(), null)); + properties.put(WriteAheadLocalStateProvider.ALWAYS_SYNC, new StandardPropertyValue("false", null)); + properties.put(WriteAheadLocalStateProvider.CHECKPOINT_INTERVAL, new StandardPropertyValue("2 mins", null)); + properties.put(WriteAheadLocalStateProvider.NUM_PARTITIONS, new StandardPropertyValue("16", null)); provider.initialize(new StateProviderInitializationContext() { @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/273e69f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml index d4a13cf..d7631c2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml @@ -25,11 +25,19 @@ Directory - the directory to store components' state in. If the directory being used is a sub-directory of the NiFi installation, it is important that the directory be copied over to the new version when upgrading NiFi. + Always Sync - If set to true, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very + expensive and can significantly reduce NiFi performance. However, if it is false, there could be the potential for data loss if either there is a sudden power loss or the + operating system crashes. The default value is false. + Partitions - The number of partitions. + Checkpoint Interval - The amount of time between checkpoints. --> <local-provider> <id>local-provider</id> <class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class> <property name="Directory">./state/local</property> + <property name="Always Sync">false</property> + <property name="Partitions">16</property> + <property name="Checkpoint Interval">2 mins</property> </local-provider> <!--
