[ 
https://issues.apache.org/jira/browse/FLUME-1232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13287619#comment-13287619
 ] 

Will McQueen commented on FLUME-1232:
-------------------------------------

Based on additional testing, it seems that the locking issue seems to be 
related to having a 'timestamp' header in the events. The issue is not 
reproducible when I remove the TimestampInterceptor. The issue is also not 
reproducible when I add a different interceptor that adds some header other 
than 'timestamp' (eg, 'blah' = 'BBB'). I don't think the issue is related 
specifically to interceptors either, just to the 'timestamp' header in events. 
The steps to reproduce are as follows. I recommending doing 'tail -f' on the 
flume log file to watch the progress, and also set your log4j.properties file 
so that flume.root.logger=DEBUG,LOGFILE.

1) Remove the checkpoint and data dirs
2) Start flume with the config file specified in the previous comment. Confirm 
that lock file exists in checkpoint dir and data dir.
3) Wait 30 seconds to allow data to flow for some time (this will also increase 
the amount of time that the replay take during a next run)
4) Stop flume. Confirm that lock file does not exist in checkpoint dir and data 
dir.
5) Start flume. Confirm that lock file exists in checkpoint dir and data dir. 
The replay mechanism will begin (as seen by tailing flume.log).
6) Stop flume while the replay is still going on (this is important). So 
basically, as soon as you started flume in step 5, stop it within a few seconds.
7) Now, start flume one last time. Tail the logfile. When the replay completes, 
you should now see the exceptions appear:

2012-06-01 12:04:23,744 DEBUG file.ReplayHandler: Pending take 
FlumeEventPointer [fileID=1, offset=1937522]
2012-06-01 12:04:23,744 DEBUG file.ReplayHandler: Pending take 
FlumeEventPointer [fileID=1, offset=1937685]
2012-06-01 12:04:23,744 DEBUG file.ReplayHandler: Pending take 
FlumeEventPointer [fileID=1, offset=1937848]
2012-06-01 12:04:23,744 ERROR file.Log: Failed to initialize Log
java.lang.IllegalStateException: Pending takes 299 exist after the end of replay
        at 
com.google.common.base.Preconditions.checkState(Preconditions.java:145)
        at 
org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:137)
        at org.apache.flume.channel.file.Log.replay(Log.java:205)
        at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:180)
        at 
org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:228)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at 
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
2012-06-01 12:04:23,745 ERROR lifecycle.LifecycleSupervisor: Unable to start 
org.apache.flume.channel.file.FileChannel@7cf1bb78 - Exception follows.
java.lang.IllegalStateException: Log is closed
        at 
com.google.common.base.Preconditions.checkState(Preconditions.java:145)
        at org.apache.flume.channel.file.Log.getFlumeEventQueue(Log.java:226)
        at 
org.apache.flume.channel.file.FileChannel.getDepth(FileChannel.java:253)
        at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:187)
        at 
org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:228)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at 
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
2012-06-01 12:04:23,746 ERROR flume.SinkRunner: Unhandled exception, logging 
and sleeping for 5000ms
java.lang.IllegalStateException: Channel closed
        at 
com.google.common.base.Preconditions.checkState(Preconditions.java:145)
        at 
org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:237)
        at 
org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:118)
        at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:61)
        at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:662)
2012-06-01 12:04:23,746 ERROR source.ExecSource: Failed while running command: 
/usr/bin/top -b -d 1
java.lang.IllegalStateException: Channel closed
        at 
com.google.common.base.Preconditions.checkState(Preconditions.java:145)
        at 
org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:237)
        at 
org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:118)
        at 
org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:248)
        at 
org.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:258)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
2012-06-01 12:04:23,747 INFO source.ExecSource: Restarting in 1000ms, exit code 
143
2012-06-01 12:04:25,267 ERROR source.ExecSource: Failed while running command: 
/usr/bin/top -b -d 1
java.lang.IllegalStateException: Channel closed
        at 
com.google.common.base.Preconditions.checkState(Preconditions.java:145)
        at 
org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:237)
        at 
org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:118)
        at 
org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:248)
        at 
org.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:258)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
2012-06-01 12:04:25,268 INFO source.ExecSource: Restarting in 1000ms, exit code 
141
2012-06-01 12:04:26,746 INFO file.FileChannel: Starting FileChannel with 
dataDir [/tmp/flume-data]
2012-06-01 12:04:26,746 INFO file.Log: Cannot lock /tmp/flume-check. The 
directory is already locked.
2012-06-01 12:04:26,747 ERROR lifecycle.LifecycleSupervisor: Unable to start 
org.apache.flume.channel.file.FileChannel@7cf1bb78 - Exception follows.
java.lang.RuntimeException: java.io.IOException: Cannot lock /tmp/flume-check. 
The directory is already locked.
        at com.google.common.base.Throwables.propagate(Throwables.java:156)
        at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:182)
        at 
org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:228)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at 
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Cannot lock /tmp/flume-check. The directory is 
already locked.
        at org.apache.flume.channel.file.Log.lock(Log.java:574)
        at org.apache.flume.channel.file.Log.<init>(Log.java:95)
        at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:178)
        ... 10 more


The interceptor I'm using is:

{code}
package org.apache.flume.interceptor.custom;

import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

public class TimestampInterceptor implements Interceptor {

  private TimestampInterceptor() {

  }

  @Override
  public void initialize() {

  }

  @Override
  public Event intercept(Event event) {
    event.getHeaders().put("timestamp", 
String.valueOf(System.currentTimeMillis()));
    return event;
  }

  @Override
  public List<Event> intercept(List<Event> events) {
    for (Event e : events) {
      intercept(e);
    }
    return events;
  }

  @Override
  public void close() {

  }

  public static class Builder implements Interceptor.Builder {

    @Override
    public void configure(Context context) {

    }

    @Override
    public Interceptor build() {
      return new TimestampInterceptor();
    }

  }

}
{code}
                
> Cannot start agent a 3rd time when using FileChannel
> ----------------------------------------------------
>
>                 Key: FLUME-1232
>                 URL: https://issues.apache.org/jira/browse/FLUME-1232
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: v1.2.0
>         Environment: RHEL 5.6 64-bit
>            Reporter: Will McQueen
>            Priority: Blocker
>             Fix For: v1.2.0
>
>
> Steps:
> 1) Start clean by wiping-out FileChannel's existing checkpoint dir and data 
> dir
> 2) Configure the agent to use filechannel (type = FILE). THe config file I 
> used is at the end of this text.
> 3) Start the agent, confirm lock files exist in data and checkpoint dirs, 
> stop agent, confirm lock files are remove from data and checkpoint dirs.
> 4) Repeat step 3
> 5) Start the agent. The following exceptions are shown in the logs:
> 2012-05-29 03:15:36,453 DEBUG file.ReplayHandler: record.getTimestamp() = 
> 1338286275813, lastCheckpoint = 1338286279596, fileId = 1, offset = 1924, 
> type = Commit, transaction 1338285619250
> 2012-05-29 03:15:36,453 DEBUG file.ReplayHandler: record.getTimestamp() = 
> 1338286279783, lastCheckpoint = 1338286279596, fileId = 1, offset = 1949, 
> type = Take, transaction 1338285619251
> 2012-05-29 03:15:36,453 DEBUG file.ReplayHandler: record.getTimestamp() = 
> 1338286279784, lastCheckpoint = 1338286279596, fileId = 1, offset = 1980, 
> type = Commit, transaction 1338285619251
> 2012-05-29 03:15:36,453 DEBUG file.ReplayHandler: Processing commit of Take
> 2012-05-29 03:15:36,453 INFO file.ReplayHandler: Replayed 1 from 
> /var/run/flume-ng/.flume/file-channel/data/log-1
> 2012-05-29 03:15:36,453 INFO file.ReplayHandler: Replaying 
> /var/run/flume-ng/.flume/file-channel/data/log-2
> 2012-05-29 03:15:36,454 DEBUG file.ReplayHandler: record.getTimestamp() = 
> 1338286370280, lastCheckpoint = 1338286279596, fileId = 2, offset = 8, type = 
> Take, transaction 1338286369982
> 2012-05-29 03:15:36,454 DEBUG file.ReplayHandler: record.getTimestamp() = 
> 1338286370287, lastCheckpoint = 1338286279596, fileId = 2, offset = 39, type 
> = Commit, transaction 1338286369982
> 2012-05-29 03:15:36,454 DEBUG file.ReplayHandler: Processing commit of Take
> 2012-05-29 03:15:36,454 INFO file.ReplayHandler: Unable to remove 
> FlumeEventPointer [fileID=1, offset=1853] added to pending list
> 2012-05-29 03:15:36,454 INFO file.ReplayHandler: Replayed 1 from 
> /var/run/flume-ng/.flume/file-channel/data/log-2
> 2012-05-29 03:15:36,454 DEBUG file.ReplayHandler: Pending take 
> FlumeEventPointer [fileID=1, offset=1853]
> 2012-05-29 03:15:36,455 ERROR file.Log: Failed to initialize Log
> java.lang.IllegalStateException: Pending takes 1 exist after the end of replay
>         at 
> com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>         at 
> org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:137)
>         at org.apache.flume.channel.file.Log.replay(Log.java:205)
>         at 
> org.apache.flume.channel.file.FileChannel.start(FileChannel.java:180)
>         at 
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:228)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>         at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> 2012-05-29 03:15:36,457 ERROR lifecycle.LifecycleSupervisor: Unable to start 
> org.apache.flume.channel.file.FileChannel@1ac88440 - Exception follows.
> java.lang.IllegalStateException: Log is closed
>         at 
> com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>         at org.apache.flume.channel.file.Log.getFlumeEventQueue(Log.java:226)
>         at 
> org.apache.flume.channel.file.FileChannel.getDepth(FileChannel.java:253)
>         at 
> org.apache.flume.channel.file.FileChannel.start(FileChannel.java:187)
>         at 
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:228)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>         at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> 2012-05-29 03:15:36,458 ERROR flume.SinkRunner: Unhandled exception, logging 
> and sleeping for 5000ms
> java.lang.IllegalStateException: Channel closed
>         at 
> com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>         at 
> org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:237)
>         at 
> org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:118)
>         at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:61)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:662)
> 2012-05-29 03:15:39,460 INFO file.FileChannel: Starting FileChannel with 
> dataDir [/var/run/flume-ng/.flume/file-channel/data]
> 2012-05-29 03:15:39,460 INFO file.Log: Cannot lock 
> /var/run/flume-ng/.flume/file-channel/checkpoint. The directory is already 
> locked.
> 2012-05-29 03:15:39,461 ERROR lifecycle.LifecycleSupervisor: Unable to start 
> org.apache.flume.channel.file.FileChannel@1ac88440 - Exception follows.
> java.lang.RuntimeException: java.io.IOException: Cannot lock 
> /var/run/flume-ng/.flume/file-channel/checkpoint. The directory is already 
> locked.
>         at com.google.common.base.Throwables.propagate(Throwables.java:156)
>         at 
> org.apache.flume.channel.file.FileChannel.start(FileChannel.java:182)
>         at 
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:228)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>         at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> Config file I used (flume.conf):
> agent.channels = c1
> agent.sources = r1
> agent.sinks = k1
> #
> agent.channels.c1.type = FILE
> #
> agent.sources.r1.channels = c1
> agent.sources.r1.type = NETCAT
> agent.sources.r1.bind = 0.0.0.0
> agent.sources.r1.port = 41414
> #
> agent.sinks.k1.channel = c1
> agent.sinks.k1.type = LOGGER

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to