[ 
https://issues.apache.org/jira/browse/FLUME-2414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Rosa updated FLUME-2414:
-----------------------------

    Description: 
Currently we are using HDFS with just 1 datanode for aggregation 10 HDD into 
one file system. When that datanode became unavailable for some small amount of 
time - below exception starts throwing endlessly.

{code} java.io.IOException: DFSOutputStream is closed
        at 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3879)
        at 
org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
        at 
org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:117)
        at org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:369)
        at org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:366)
        at 
org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:559)
        at 
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:161)
        at 
org.apache.flume.sink.hdfs.BucketWriter.access$1000(BucketWriter.java:57)
        at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:556)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722){code}

As i understand issue with endless .close attempts already solved, but issue 
with endless .doFlush() is still there.   

I writes some rough hot-fix:

{code}
/**
   * doFlush() must only be called by flush()
   * @throws IOException
   */
  private void doFlush() throws IOException, InterruptedException {
    try{
      LOG.info("doFlush() called. 
doFlushErrorCounter="+doFlushErrorCounter.get());
      callWithTimeout(new CallRunner<Void>() {
        @Override
        public Void call() throws Exception {
          writer.sync(); // could block
          return null;
        }
      });
      LOG.info("doFinished() called. 
doFlushErrorCounter="+doFlushErrorCounter.get());
    } catch (IOException e){
      int errorsCount = doFlushErrorCounter.incrementAndGet();
      LOG.error("IO EXCEPTION DURING FLUSH. Errors Count: "+errorsCount, e);
      if(errorsCount<=DO_FLUSH_ERROR_LIMIT){
        Thread.sleep(DO_FLUSH_ERROR_SLEEP_TIME_MSEC);
        throw e;
      }
    }
    doFlushErrorCounter.set(0);
    batchCounter = 0;
  }
{code}

With this rough fix i just skipping .doFlush call after some numbers of 
attempts. However i found that after some time some groups of logs still stops 
writing. 

My flume configuration below

{code}
agent.sources = avroSource
agent.channels = memoryChannel
agent.sinks = hdfsSink

# SOURCE
agent.sources.avroSource.type=avro
agent.sources.avroSource.bind=1.1.1.1
agent.sources.avroSource.port=11111
agent.sources.avroSource.channels=memoryChannel
agent.sources.avroSource.interceptors = i1
agent.sources.avroSource.interceptors.i1.type = timestamp

# CHANNEL
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=100000
agent.channels.memoryChannel.transactionCapacity=10000

#SINK
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel=memoryChannel
agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-master:9000/globallog/%{env}/%{name}/date=%d%m%y
agent.sinks.hdfsSink.hdfs.rollInterval=3600
agent.sinks.hdfsSink.hdfs.rollSize=64000000
agent.sings.hdfsSink.hdfs.idleTimeout=3600
agent.sings.hdfsSink.hdfs.batchSize=1000
agent.sings.hdfsSink.hdfs.maxOpenFiles=200
agent.sings.hdfsSink.hdfs.callTimeout=240000
agent.sinks.hdfsSink.hdfs.inUsePrefix=.
agent.sinks.hdfsSink.hdfs.rollCount=0
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.fileType=DataStream

{code}

When i trigger shutdown, agent-shutdown-hook print lot of logs about old still 
opened files

{code}
2014-06-24 05:59:29,176 (agent-shutdown-hook) [INFO - 
org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] Closing 
hdfs://hadoop-master:9000/globallog/prod/FILES_WRONG_SIZE/date=180614/FlumeData
2014-06-24 05:59:29,176 (agent-shutdown-hook) [INFO - 
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:297)] 
HDFSWriter is already closed: 
hdfs://hadoop-master:9000/globallog/prod/FILES_WRONG_SIZE/date=180614/.FlumeData.1403049638474.tmp
2014-06-24 05:59:29,176 (agent-shutdown-hook) [INFO - 
org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] Closing 
hdfs://hadoop-master:9000/globallog/prod/FILE_OBJ_INCORRECT_MD5_DIGEST/date=180614/FlumeData
2014-06-24 05:59:29,176 (agent-shutdown-hook) [INFO - 
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:297)] 
HDFSWriter is already closed: 
hdfs://hadoop-master:9000/globallog/prod/FILE_OBJ_INCORRECT_MD5_DIGEST/date=180614/.FlumeData.1403050060059.tmp
2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] Closing 
hdfs://hadoop-master:9000/globallog/prod/FILE_OBJ_NULL/date=180614/FlumeData
2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:297)] 
HDFSWriter is already closed: 
hdfs://hadoop-master:9000/globallog/prod/FILE_OBJ_NULL/date=180614/.FlumeData.1403054517880.tmp
2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] Closing 
hdfs://hadoop-master:9000/globallog/prod/REST_MONITORING/date=180614/FlumeData
2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:297)] 
HDFSWriter is already closed: 
hdfs://hadoop-master:9000/globallog/prod/REST_MONITORING/date=180614/.FlumeData.1403049621998.tmp
2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] Closing 
hdfs://hadoop-master:9000/globallog/prod/EMBED_PLAYER/date=180614/FlumeData
2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:297)] 
HDFSWriter is already closed: 
hdfs://hadoop-master:9000/globallog/prod/EMBED_PLAYER/date=180614/.FlumeData.1403049621963.tmp
2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] Closing 
hdfs://hadoop-master:9000/globallog/prod/EMBED_PLAYER/date=190614/FlumeData
2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:297)] 
HDFSWriter is already closed: 
hdfs://hadoop-master:9000/globallog/prod/EMBED_PLAYER/date=190614/.FlumeData.1403136037015.tmp
2014-06-24 05:59:29,178 (agent-shutdown-hook) [INFO - 
org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] Closing 
hdfs://hadoop-master:9000/globallog/prod/FILE_OBJ_INCORRECT_MD5_DIGEST/date=190614/FlumeData
{code}

But as per log data that files was never fail on .sync()

  was:
Currently we are using HDFS with just 1 datanode for aggregation 10 HDD into 
one file system. When that datanode became unavailable for some small amount of 
time - below exception starts throwing endlessly.

{code} java.io.IOException: DFSOutputStream is closed
        at 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3879)
        at 
org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
        at 
org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:117)
        at org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:369)
        at org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:366)
        at 
org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:559)
        at 
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:161)
        at 
org.apache.flume.sink.hdfs.BucketWriter.access$1000(BucketWriter.java:57)
        at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:556)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722){code}

As i understand issue with endless .close attempts already solved, but issue 
with endless .doFlush() is still there.   

I writes some rough hot-fix:

{code}
/**
   * doFlush() must only be called by flush()
   * @throws IOException
   */
  private void doFlush() throws IOException, InterruptedException {
    try{
      LOG.info("doFlush() called. 
doFlushErrorCounter="+doFlushErrorCounter.get());
      callWithTimeout(new CallRunner<Void>() {
        @Override
        public Void call() throws Exception {
          writer.sync(); // could block
          return null;
        }
      });
      LOG.info("doFinished() called. 
doFlushErrorCounter="+doFlushErrorCounter.get());
    } catch (IOException e){
      int errorsCount = doFlushErrorCounter.incrementAndGet();
      LOG.error("IO EXCEPTION DURING FLUSH. Errors Count: "+errorsCount, e);
      if(errorsCount<=DO_FLUSH_ERROR_LIMIT){
        Thread.sleep(DO_FLUSH_ERROR_SLEEP_TIME_MSEC);
        throw e;
      }
    }
    doFlushErrorCounter.set(0);
    batchCounter = 0;
  }
{code}

With this rough fix i just skipping .doFlush call after some numbers of 
attempts. However i found that after some time some groups of logs still stops 
writing. 

My flume configuration below

{code}
agent.sources = avroSource
agent.channels = memoryChannel
agent.sinks = hdfsSink

# SOURCE
agent.sources.avroSource.type=avro
agent.sources.avroSource.bind=1.1.1.1
agent.sources.avroSource.port=11111
agent.sources.avroSource.channels=memoryChannel
agent.sources.avroSource.interceptors = i1
agent.sources.avroSource.interceptors.i1.type = timestamp

# CHANNEL
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=100000
agent.channels.memoryChannel.transactionCapacity=10000

#SINK
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel=memoryChannel
agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-master:9000/globallog/%{env}/%{name}/date=%d%m%y
agent.sinks.hdfsSink.hdfs.rollInterval=3600
agent.sinks.hdfsSink.hdfs.rollSize=64000000
agent.sings.hdfsSink.hdfs.idleTimeout=3600
agent.sings.hdfsSink.hdfs.batchSize=1000
agent.sings.hdfsSink.hdfs.maxOpenFiles=200
agent.sings.hdfsSink.hdfs.callTimeout=240000
agent.sinks.hdfsSink.hdfs.inUsePrefix=.
agent.sinks.hdfsSink.hdfs.rollCount=0
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.fileType=DataStream

{code}


> java.io.IOException: DFSOutputStream is closed on HDFSDataStream.sync
> ---------------------------------------------------------------------
>
>                 Key: FLUME-2414
>                 URL: https://issues.apache.org/jira/browse/FLUME-2414
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.4.0, v1.5.0
>         Environment: Java(TM) SE Runtime Environment (build 1.7.0_01-b08)
> Hadoop 1.2.1 with just 1 data-node 
>            Reporter: Andy Rosa
>
> Currently we are using HDFS with just 1 datanode for aggregation 10 HDD into 
> one file system. When that datanode became unavailable for some small amount 
> of time - below exception starts throwing endlessly.
> {code} java.io.IOException: DFSOutputStream is closed
>       at 
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3879)
>       at 
> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>       at 
> org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:117)
>       at org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:369)
>       at org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:366)
>       at 
> org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:559)
>       at 
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:161)
>       at 
> org.apache.flume.sink.hdfs.BucketWriter.access$1000(BucketWriter.java:57)
>       at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:556)
>       at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>       at java.lang.Thread.run(Thread.java:722){code}
> As i understand issue with endless .close attempts already solved, but issue 
> with endless .doFlush() is still there.   
> I writes some rough hot-fix:
> {code}
> /**
>    * doFlush() must only be called by flush()
>    * @throws IOException
>    */
>   private void doFlush() throws IOException, InterruptedException {
>     try{
>       LOG.info("doFlush() called. 
> doFlushErrorCounter="+doFlushErrorCounter.get());
>       callWithTimeout(new CallRunner<Void>() {
>         @Override
>         public Void call() throws Exception {
>           writer.sync(); // could block
>           return null;
>         }
>       });
>       LOG.info("doFinished() called. 
> doFlushErrorCounter="+doFlushErrorCounter.get());
>     } catch (IOException e){
>       int errorsCount = doFlushErrorCounter.incrementAndGet();
>       LOG.error("IO EXCEPTION DURING FLUSH. Errors Count: "+errorsCount, e);
>       if(errorsCount<=DO_FLUSH_ERROR_LIMIT){
>         Thread.sleep(DO_FLUSH_ERROR_SLEEP_TIME_MSEC);
>         throw e;
>       }
>     }
>     doFlushErrorCounter.set(0);
>     batchCounter = 0;
>   }
> {code}
> With this rough fix i just skipping .doFlush call after some numbers of 
> attempts. However i found that after some time some groups of logs still 
> stops writing. 
> My flume configuration below
> {code}
> agent.sources = avroSource
> agent.channels = memoryChannel
> agent.sinks = hdfsSink
> # SOURCE
> agent.sources.avroSource.type=avro
> agent.sources.avroSource.bind=1.1.1.1
> agent.sources.avroSource.port=11111
> agent.sources.avroSource.channels=memoryChannel
> agent.sources.avroSource.interceptors = i1
> agent.sources.avroSource.interceptors.i1.type = timestamp
> # CHANNEL
> agent.channels.memoryChannel.type=memory
> agent.channels.memoryChannel.capacity=100000
> agent.channels.memoryChannel.transactionCapacity=10000
> #SINK
> agent.sinks.hdfsSink.type=hdfs
> agent.sinks.hdfsSink.channel=memoryChannel
> agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-master:9000/globallog/%{env}/%{name}/date=%d%m%y
> agent.sinks.hdfsSink.hdfs.rollInterval=3600
> agent.sinks.hdfsSink.hdfs.rollSize=64000000
> agent.sings.hdfsSink.hdfs.idleTimeout=3600
> agent.sings.hdfsSink.hdfs.batchSize=1000
> agent.sings.hdfsSink.hdfs.maxOpenFiles=200
> agent.sings.hdfsSink.hdfs.callTimeout=240000
> agent.sinks.hdfsSink.hdfs.inUsePrefix=.
> agent.sinks.hdfsSink.hdfs.rollCount=0
> agent.sinks.hdfsSink.hdfs.writeFormat=Text
> agent.sinks.hdfsSink.hdfs.fileType=DataStream
> {code}
> When i trigger shutdown, agent-shutdown-hook print lot of logs about old 
> still opened files
> {code}
> 2014-06-24 05:59:29,176 (agent-shutdown-hook) [INFO - 
> org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] 
> Closing 
> hdfs://hadoop-master:9000/globallog/prod/FILES_WRONG_SIZE/date=180614/FlumeData
> 2014-06-24 05:59:29,176 (agent-shutdown-hook) [INFO - 
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:297)] 
> HDFSWriter is already closed: 
> hdfs://hadoop-master:9000/globallog/prod/FILES_WRONG_SIZE/date=180614/.FlumeData.1403049638474.tmp
> 2014-06-24 05:59:29,176 (agent-shutdown-hook) [INFO - 
> org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] 
> Closing 
> hdfs://hadoop-master:9000/globallog/prod/FILE_OBJ_INCORRECT_MD5_DIGEST/date=180614/FlumeData
> 2014-06-24 05:59:29,176 (agent-shutdown-hook) [INFO - 
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:297)] 
> HDFSWriter is already closed: 
> hdfs://hadoop-master:9000/globallog/prod/FILE_OBJ_INCORRECT_MD5_DIGEST/date=180614/.FlumeData.1403050060059.tmp
> 2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
> org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] 
> Closing 
> hdfs://hadoop-master:9000/globallog/prod/FILE_OBJ_NULL/date=180614/FlumeData
> 2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:297)] 
> HDFSWriter is already closed: 
> hdfs://hadoop-master:9000/globallog/prod/FILE_OBJ_NULL/date=180614/.FlumeData.1403054517880.tmp
> 2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
> org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] 
> Closing 
> hdfs://hadoop-master:9000/globallog/prod/REST_MONITORING/date=180614/FlumeData
> 2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:297)] 
> HDFSWriter is already closed: 
> hdfs://hadoop-master:9000/globallog/prod/REST_MONITORING/date=180614/.FlumeData.1403049621998.tmp
> 2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
> org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] 
> Closing 
> hdfs://hadoop-master:9000/globallog/prod/EMBED_PLAYER/date=180614/FlumeData
> 2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:297)] 
> HDFSWriter is already closed: 
> hdfs://hadoop-master:9000/globallog/prod/EMBED_PLAYER/date=180614/.FlumeData.1403049621963.tmp
> 2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
> org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] 
> Closing 
> hdfs://hadoop-master:9000/globallog/prod/EMBED_PLAYER/date=190614/FlumeData
> 2014-06-24 05:59:29,177 (agent-shutdown-hook) [INFO - 
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:297)] 
> HDFSWriter is already closed: 
> hdfs://hadoop-master:9000/globallog/prod/EMBED_PLAYER/date=190614/.FlumeData.1403136037015.tmp
> 2014-06-24 05:59:29,178 (agent-shutdown-hook) [INFO - 
> org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:437)] 
> Closing 
> hdfs://hadoop-master:9000/globallog/prod/FILE_OBJ_INCORRECT_MD5_DIGEST/date=190614/FlumeData
> {code}
> But as per log data that files was never fail on .sync()



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to