Andy Rosa created FLUME-2414:
--------------------------------

             Summary: java.io.IOException: DFSOutputStream is closed on 
HDFSDataStream.sync
                 Key: FLUME-2414
                 URL: https://issues.apache.org/jira/browse/FLUME-2414
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: v1.5.0, v1.4.0
         Environment: Java(TM) SE Runtime Environment (build 1.7.0_01-b08)

Hadoop 1.2.1 with just 1 data-node 
            Reporter: Andy Rosa


Currently we are using HDFS with just 1 datanode for aggregation 10 HDD into 
one file system. When that datanode became unavailable for some small amount of 
time - below exception starts throwing endlessly.

{code} java.io.IOException: DFSOutputStream is closed
        at 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3879)
        at 
org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
        at 
org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:117)
        at org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:369)
        at org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:366)
        at 
org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:559)
        at 
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:161)
        at 
org.apache.flume.sink.hdfs.BucketWriter.access$1000(BucketWriter.java:57)
        at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:556)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722){code}

As i understand issue with endless .close attempts already solved, but issue 
with endless .doFlush() is still there.   

I writes some rough hot-fix:

{code}
/**
   * doFlush() must only be called by flush()
   * @throws IOException
   */
  private void doFlush() throws IOException, InterruptedException {
    try{
      LOG.info("doFlush() called. 
doFlushErrorCounter="+doFlushErrorCounter.get());
      callWithTimeout(new CallRunner<Void>() {
        @Override
        public Void call() throws Exception {
          writer.sync(); // could block
          return null;
        }
      });
      LOG.info("doFinished() called. 
doFlushErrorCounter="+doFlushErrorCounter.get());
    } catch (IOException e){
      int errorsCount = doFlushErrorCounter.incrementAndGet();
      LOG.error("IO EXCEPTION DURING FLUSH. Errors Count: "+errorsCount, e);
      if(errorsCount<=DO_FLUSH_ERROR_LIMIT){
        Thread.sleep(DO_FLUSH_ERROR_SLEEP_TIME_MSEC);
        throw e;
      }
    }
    doFlushErrorCounter.set(0);
    batchCounter = 0;
  }
{code}

With this rough fix i just skipping .doFlush call after some numbers of 
attempts. However i found that after some time some groups of logs still stops 
writing. 

My flume configuration below

{code}
agent.sources = avroSource
agent.channels = memoryChannel
agent.sinks = hdfsSink

# SOURCE
agent.sources.avroSource.type=avro
agent.sources.avroSource.bind=204.155.149.34
agent.sources.avroSource.port=41418
agent.sources.avroSource.channels=memoryChannel
agent.sources.avroSource.interceptors = i1
agent.sources.avroSource.interceptors.i1.type = timestamp

# CHANNEL
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=100000
agent.channels.memoryChannel.transactionCapacity=10000

#SINK
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel=memoryChannel
agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-master:9000/globallog/%{env}/%{name}/date=%d%m%y
agent.sinks.hdfsSink.hdfs.rollInterval=3600
agent.sinks.hdfsSink.hdfs.rollSize=64000000
agent.sings.hdfsSink.hdfs.idleTimeout=3600
agent.sings.hdfsSink.hdfs.batchSize=1000
agent.sings.hdfsSink.hdfs.maxOpenFiles=200
agent.sings.hdfsSink.hdfs.callTimeout=240000
agent.sinks.hdfsSink.hdfs.inUsePrefix=.
agent.sinks.hdfsSink.hdfs.rollCount=0
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.fileType=DataStream

{code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to