Andy Rosa created FLUME-2414:
--------------------------------
Summary: java.io.IOException: DFSOutputStream is closed on
HDFSDataStream.sync
Key: FLUME-2414
URL: https://issues.apache.org/jira/browse/FLUME-2414
Project: Flume
Issue Type: Bug
Components: Sinks+Sources
Affects Versions: v1.5.0, v1.4.0
Environment: Java(TM) SE Runtime Environment (build 1.7.0_01-b08)
Hadoop 1.2.1 with just 1 data-node
Reporter: Andy Rosa
Currently we are using HDFS with just 1 datanode for aggregation 10 HDD into
one file system. When that datanode became unavailable for some small amount of
time - below exception starts throwing endlessly.
{code} java.io.IOException: DFSOutputStream is closed
at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3879)
at
org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
at
org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:117)
at org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:369)
at org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:366)
at
org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:559)
at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:161)
at
org.apache.flume.sink.hdfs.BucketWriter.access$1000(BucketWriter.java:57)
at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:556)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722){code}
As i understand issue with endless .close attempts already solved, but issue
with endless .doFlush() is still there.
I writes some rough hot-fix:
{code}
/**
* doFlush() must only be called by flush()
* @throws IOException
*/
private void doFlush() throws IOException, InterruptedException {
try{
LOG.info("doFlush() called.
doFlushErrorCounter="+doFlushErrorCounter.get());
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
writer.sync(); // could block
return null;
}
});
LOG.info("doFinished() called.
doFlushErrorCounter="+doFlushErrorCounter.get());
} catch (IOException e){
int errorsCount = doFlushErrorCounter.incrementAndGet();
LOG.error("IO EXCEPTION DURING FLUSH. Errors Count: "+errorsCount, e);
if(errorsCount<=DO_FLUSH_ERROR_LIMIT){
Thread.sleep(DO_FLUSH_ERROR_SLEEP_TIME_MSEC);
throw e;
}
}
doFlushErrorCounter.set(0);
batchCounter = 0;
}
{code}
With this rough fix i just skipping .doFlush call after some numbers of
attempts. However i found that after some time some groups of logs still stops
writing.
My flume configuration below
{code}
agent.sources = avroSource
agent.channels = memoryChannel
agent.sinks = hdfsSink
# SOURCE
agent.sources.avroSource.type=avro
agent.sources.avroSource.bind=204.155.149.34
agent.sources.avroSource.port=41418
agent.sources.avroSource.channels=memoryChannel
agent.sources.avroSource.interceptors = i1
agent.sources.avroSource.interceptors.i1.type = timestamp
# CHANNEL
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=100000
agent.channels.memoryChannel.transactionCapacity=10000
#SINK
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel=memoryChannel
agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-master:9000/globallog/%{env}/%{name}/date=%d%m%y
agent.sinks.hdfsSink.hdfs.rollInterval=3600
agent.sinks.hdfsSink.hdfs.rollSize=64000000
agent.sings.hdfsSink.hdfs.idleTimeout=3600
agent.sings.hdfsSink.hdfs.batchSize=1000
agent.sings.hdfsSink.hdfs.maxOpenFiles=200
agent.sings.hdfsSink.hdfs.callTimeout=240000
agent.sinks.hdfsSink.hdfs.inUsePrefix=.
agent.sinks.hdfsSink.hdfs.rollCount=0
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.fileType=DataStream
{code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)