[
https://issues.apache.org/jira/browse/FLUME-2429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14072712#comment-14072712
]
Jay commented on FLUME-2429:
----------------------------
Thank you, chenshangan.
I use Flume to collect logs (one 200~250MB log and two 50~70MB logs per 1 min)
in HDFS. NW is available to 1GB.
Usaully, Flume process starts at HH:MM:05 and ends at HH:MM:10~15, but WARNING
occurrs sometimes (when setting callTimeout into between 10000 and 15000).
At that time, some lines of the final sinked log files are duplicated!! (this
is the main problem. expected size is 200MB, but size of the final sinked files
are between 250MB and 400MB because of the duplicated lines; also there was no
error in HDFS or in Namenodes or in other Hadoop Components)
After changing the callTimeout into 60000 or more, there has been no more error
in the flume log file and size of the final sinked log files is same w expected.
After all, my purpose to use Flume is to collect logs per 1 min and I want to
know why short callTimeout (10000ms or 15000ms) brings duplicated lines in a
sinked file.
Thanks in advance.
> Callable timed out in HDFS sink
> -------------------------------
>
> Key: FLUME-2429
> URL: https://issues.apache.org/jira/browse/FLUME-2429
> Project: Flume
> Issue Type: Bug
> Affects Versions: v1.4.0
> Reporter: Jay
>
> Hi.
> I got a warning msg using HDFS sink.
> AVRO source > Memory (or File) channel > HDFS sink
> Switching channel type didn't solve the problem.
> Error occurs once a day or several days.
> Any Solution?
> Here is my configuration.
> --------------------------------------------------------------------
> testAgent.sources = testSrc
> testAgent.channels = testChannel
> testAgent.sinks = testSink
> testAgent.sources.testSrc.type = avro
> testAgent.sources.testSrc.channels = testChannel
> testAgent.channels.testChannel.type = memory
> testAgent.sources.testSrc.bind = 0.0.0.0
> testAgent.sources.testSrc.port = 4141
> testAgent.sinks.testSink.type = hdfs
> testAgent.sinks.testSink.channel = testChannel
> testAgent.sources.testSrc.interceptors = testInterceptor
> testAgent.sources.testSrc.interceptors.testInterceptor.type = static
> testAgent.sources.testSrc.interceptors.testInterceptor.preserveExisting = true
> testAgent.sources.testSrc.interceptors.testInterceptor.key = testKey
> testAgent.sources.testSrc.interceptors.testInterceptor.value = .testfile
> testAgent.sinks.testSink.hdfs.path = hdfs://hadoop-cluster:8020/flume/%Y%m%d
> testAgent.sinks.testSink.hdfs.filePrefix = %Y%m%d%H%M
> testAgent.sinks.testSink.hdfs.fileSuffix = .testfile
> testAgent.sinks.testSink.hdfs.fileType = DataStream
> testAgent.sinks.testSink.hdfs.rollInterval = 1
> testAgent.sinks.testSink.hdfs.rollCount = 0
> testAgent.sinks.testSink.hdfs.rollSize = 0
> testAgent.sinks.testSink.hdfs.batchSize = 150000
> testAgent.sinks.testSink.hdfs.callTimeout = 15000
> testAgent.sinks.testSink.hdfs.useLocalTimeStamp = true
> testAgent.sinks.testSink.serializer = text
> testAgent.sinks.testSink.serializer.appendNewline = false
> testAgent.channels.testChannel.keep-alive = 1
> testAgent.channels.testChannel.write-timeout = 1
> testAgent.channels.testChannel.transactionCapacity = 150000
> testAgent.channels.testChannel.capacity = 18000000
> #testAgent.channels.testChannel.checkpointDir = /data/flumedata/checkpoint
> #testAgent.channels.testChannel.useDualCheckpoints = true
> #testAgent.channels.testChannel.backupCheckpointDir =
> /data/flumedata_backup/checkpoint
> #testAgent.channels.testChannel.dataDirs = /data/flumedata/data
> testAgent.channels.testChannel.byteCapacityBufferPercentage = 20
> testAgent.channels.testChannel.byteCapacity = 1000000000
> --------------------------------------------------------------------
> I sometimes get a warning message in a flume log.
> --------------------------------------------------------------------
> 2014-07-22 16:28:20,186 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN
> - org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:477)]
> Caught IOException writing to HDFSWriter (Callable timed out after 15000 ms
> on file:
> hdfs://hadoop-cluster:8020/flume/20140722/201407221628.1406014084417.testfile.tmp).
> Closing file
> (hdfs://hadoop-cluster:8020/flume/20140722/201407221628.1406014084417.testfile.tmp)
> and rethrowing exception.
> 2014-07-22 16:28:35,187 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN
> - org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:483)]
> Caught IOException while closing file
> (hdfs://hadoop-cluster:8020/flume/20140722/201407221628.1406014084417.testfile.tmp).
> Exception follows.
> java.io.IOException: Callable timed out after 15000 ms on file:
> hdfs://search-hdanal-cluster:8020/flume/20140722/201407221628.1406014084417.testfile.tmp
> at
> org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:603)
> at
> org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:381)
> at
> org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:343)
> at
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:292)
> at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:481)
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:401)
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: java.util.concurrent.TimeoutException
> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
> at java.util.concurrent.FutureTask.get(FutureTask.java:91)
> at
> org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:596)
> ... 8 more
> 2014-07-22 16:28:35,187 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN
> - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:438)]
> HDFS IO error
> java.io.IOException: Callable timed out after 15000 ms on file:
> hdfs://hadoop-cluster:8020/flume/20140722/201407221628.1406014084417.testfile.tmp
> at
> org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:603)
> at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:469)
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:401)
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: java.util.concurrent.TimeoutException
> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
> at java.util.concurrent.FutureTask.get(FutureTask.java:91)
> at
> org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:596)
> ... 5 more
> --------------------------------------------------------------------
--
This message was sent by Atlassian JIRA
(v6.2#6252)