Repository: flume Updated Branches: refs/heads/trunk d1f24f56c -> 0d437810d
FLUME-2786 FLUME-3056 FLUME-3117 Application enters a deadlock when stopped while handleConfigurationEvent Adding better locking mechanism to Application class to prevent deadlock. this closes #108 this closes #144 Revievers: Denes Arvay, Attila Simon, Benedict Jin, Ferenc Szabo (Andras Beni, Yan Jian via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0d437810 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0d437810 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0d437810 Branch: refs/heads/trunk Commit: 0d437810dc850192b48fa3b31608ffcd23b1f1e9 Parents: d1f24f5 Author: Andras Beni <[email protected]> Authored: Fri Mar 9 12:06:25 2018 +0100 Committer: Ferenc Szabo <[email protected]> Committed: Fri Mar 9 12:06:25 2018 +0100 ---------------------------------------------------------------------- .../java/org/apache/flume/node/Application.java | 45 +++++++++++++++----- ...lingPropertiesFileConfigurationProvider.java | 8 +++- .../org/apache/flume/node/TestApplication.java | 42 ++++++++++++++++++ .../test/resources/flume-conf.properties.2786 | 35 +++++++++++++++ 4 files changed, 117 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/0d437810/flume-ng-node/src/main/java/org/apache/flume/node/Application.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java index d6d92f0..7893fcc 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java @@ -52,6 +52,7 @@ import java.util.Locale; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; public class Application { @@ -65,6 +66,7 @@ public class Application { private final LifecycleSupervisor supervisor; private MaterializedConfiguration materializedConfiguration; private MonitorService monitorServer; + private final ReentrantLock lifecycleLock = new ReentrantLock(); public Application() { this(new ArrayList<LifecycleAware>(0)); @@ -75,23 +77,44 @@ public class Application { supervisor = new LifecycleSupervisor(); } - public synchronized void start() { - for (LifecycleAware component : components) { - supervisor.supervise(component, - new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + public void start() { + lifecycleLock.lock(); + try { + for (LifecycleAware component : components) { + supervisor.supervise(component, + new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + } + } finally { + lifecycleLock.unlock(); } } @Subscribe - public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { - stopAllComponents(); - startAllComponents(conf); + public void handleConfigurationEvent(MaterializedConfiguration conf) { + try { + lifecycleLock.lockInterruptibly(); + stopAllComponents(); + startAllComponents(conf); + } catch (InterruptedException e) { + logger.info("Interrupted while trying to handle configuration event"); + return; + } finally { + // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock + if (lifecycleLock.isHeldByCurrentThread()) { + lifecycleLock.unlock(); + } + } } - public synchronized void stop() { - supervisor.stop(); - if (monitorServer != null) { - monitorServer.stop(); + public void stop() { + lifecycleLock.lock(); + try { + supervisor.stop(); + if (monitorServer != null) { + monitorServer.stop(); + } + } finally { + lifecycleLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/flume/blob/0d437810/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java index 91a09f0..13cb38f 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java @@ -85,8 +85,12 @@ public class PollingPropertiesFileConfigurationProvider executorService.shutdown(); try { - while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { - LOGGER.debug("Waiting for file watcher to terminate"); + if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { + LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor."); + executorService.shutdownNow(); + while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { + LOGGER.debug("Waiting for file watcher to terminate"); + } } } catch (InterruptedException e) { LOGGER.debug("Interrupted while waiting for file watcher to terminate"); http://git-wip-us.apache.org/repos/asf/flume/blob/0d437810/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java index affbd8c..3853d50 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java @@ -153,4 +153,46 @@ public class TestApplication { application.stop(); } } + + @Test(timeout = 10000L) + public void testFLUME2786() throws Exception { + final String agentName = "test"; + final int interval = 1; + final long intervalMs = 1000L; + + File configFile = new File(baseDir, "flume-conf.properties"); + Files.copy(new File(getClass().getClassLoader() + .getResource("flume-conf.properties.2786").getFile()), configFile); + File mockConfigFile = spy(configFile); + when(mockConfigFile.lastModified()).then(new Answer<Long>() { + @Override + public Long answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(intervalMs); + return System.currentTimeMillis(); + } + }); + + EventBus eventBus = new EventBus(agentName + "-event-bus"); + PollingPropertiesFileConfigurationProvider configurationProvider = + new PollingPropertiesFileConfigurationProvider(agentName, + mockConfigFile, eventBus, interval); + PollingPropertiesFileConfigurationProvider mockConfigurationProvider = + spy(configurationProvider); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(intervalMs); + invocation.callRealMethod(); + return null; + } + }).when(mockConfigurationProvider).stop(); + + List<LifecycleAware> components = Lists.newArrayList(); + components.add(mockConfigurationProvider); + Application application = new Application(components); + eventBus.register(application); + application.start(); + Thread.sleep(1500L); + application.stop(); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/0d437810/flume-ng-node/src/test/resources/flume-conf.properties.2786 ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/resources/flume-conf.properties.2786 b/flume-ng-node/src/test/resources/flume-conf.properties.2786 new file mode 100755 index 0000000..2a7bea0 --- /dev/null +++ b/flume-ng-node/src/test/resources/flume-conf.properties.2786 @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Flume Configuration for testing FLUME-2786 +# + +test.sources = source1 +test.channels = channel1 +test.sinks = sink1 + +test.sources.source1.type = seq +test.sources.source1.totalEvents = 10000 +test.sources.source1.channels = channel1 + +test.channels.channel1.type = memory +test.channels.channel1.capacity = 10000 + +test.sinks.sink1.type = null +test.sinks.sink1.channel = channel1
