We have set up a configuration that uses the collector sink with our own
sink (called GlobalLogSink).  Configuration is:

collector (15000) { GlobalLogSink(...) }

The GlobalLogSink connects to Cassandra which we have not fully tuned yet.
We are running into an exception in the RollSink code that causes
DirectDriver to close down, which occurs when Cassandra  does a Garbage
Collection or some other time intensive action that causes a response to be
longer than necessary.  I have pasted two tracebacks at the end of this
email, one from the collector and the other from jstack.

The code at RollSink:296  does a write with a wait set to 1000 millis; if
the write takes more than that, an exception occurs that shuts the sink
down.

When testing in our development environment, we found that this problem went
away, but there we were using the first flume-0.9.4 release, which has the
'synchronize' functionality before the ReentrantReadWriteLock was
implemented, so we had to back down a release or two from flume-0.9.4+25.6,
but then we ran into Ack problems.

There are a couple fixes needed to correct this issue:
We would like to add a fix to RollSink.java to make the 1000ms delay
configurable.
Also, when the timeout does occur, the Sink should fail gracefully and not
just hang/close down.
I would like to open a JIRA on this.

To fix the problem I need to know what source/branch to use - this error
occurs in the source associated with the flume-0.9.4+25 RPM file where we
found source at 0.9.4-cdh3u1 that corresponds to that.  Is there a branch
label or tag that we should use to fix this?

A couple of related questions we'd appreciate answers to:
1) Is there a way to set up a collector sink that doesn't use RollSink?
2) Is there a way to get acknowledgments without using RollSink?

Ideally, we would like to have something like an 'AckSink' that's lets us
utilize E2E with out having the RollSink functionality.

Thanks in advance,
Derek Deeter




2011-09-15 11:17:24,425 INFO com.cloudera.flume.handlers.rolling.RollSink:
closing RollSink 'globalLogSink( "Global_Logging", "Audit_Log", "
192.168.80.138:9160", "192.168.80.140:9160", "192.168.80.141:9160", "
192.168.80.148:9160" )'
2011-09-15 11:17:24,429 ERROR
com.cloudera.flume.core.connector.DirectDriver: Closing down due to
exception during append calls
java.lang.InterruptedException
                at
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireNanos(AbstractQueuedSynchronizer.java:1159)
                at
java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.tryLock(ReentrantReadWriteLock.java:976)
                at com.cloudera.flume.handlers.rolling.
RollSink.close(RollSink.java:296)
                at
com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
                at
com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
                at
com.cloudera.flume.handlers.debug.InsistentOpenDecorator.close(InsistentOpenDecorator.java:175)
                at
com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
                at
com.cloudera.flume.handlers.debug.StubbornAppendSink.append(StubbornAppendSink.java:78)
                at
com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
                at
com.cloudera.flume.handlers.debug.InsistentAppendDecorator.append(InsistentAppendDecorator.java:110)
                at
com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
                at
com.cloudera.flume.handlers.endtoend.AckChecksumChecker.append(AckChecksumChecker.java:172)
                at
com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
                at
com.cloudera.flume.handlers.batch.UnbatchingDecorator.append(UnbatchingDecorator.java:62)
                at
com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
                at
com.cloudera.flume.handlers.batch.GunzipDecorator.append(GunzipDecorator.java:81)
                at
com.cloudera.flume.collector.CollectorSink.append(CollectorSink.java:222)
                at
com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:110)
2011-09-15 11:17:24,430 INFO com.cloudera.flume.core.connector.DirectDriver:
Connector logicalNode strafe1a.web.prod.diginsite.com-19 exited with error:
null
java.lang.InterruptedException

*This is what a jstack trace showed just at when the Collector hung:*

2011-09-15 15:42:10
Full thread dump Java HotSpot(TM) 64-Bit Server VM (11.0-b16 mixed mode):

"pool-2-thread-1" prio=10 tid=0x00002aab3c064400 nid=0x16a9 runnable
[0x0000000041523000..0x0000000041523d10]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.read(SocketInputStream.java:129)
        at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
        at
org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
        at
org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
        at
org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
        at
org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
        at
org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
        at
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
        at
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
        at
org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:1025)
        at
org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:1009)
        at
com.intuit.ifs.globallogging.cassandra.plugin.CassandraClient.batchMutate(CassandraClient.java:155)
        at
com.intuit.ifs.globallogging.cassandra.plugin.CassandraClient.insert(CassandraClient.java:139)
        at
com.intuit.ifs.globallogging.cassandra.plugin.GlobalLogSink.append(GlobalLogSink.java:101)
        at
com.cloudera.flume.core.CompositeSink.append(CompositeSink.java:61)
        at
com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
        at com.cloudera.flume.handlers.rolling.
RollSink.synchronousAppend(RollSink.java:234)
        at
com.cloudera.flume.handlers.rolling.RollSink$1.call(RollSink.java:183)
        at
com.cloudera.flume.handlers.rolling.RollSink$1.call(RollSink.java:181)
        at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)

Reply via email to