liu xiaofei created FLUME-2741:
----------------------------------
Summary: hdfs sink could not close file,should remove writer from
sfWriters
Key: FLUME-2741
URL: https://issues.apache.org/jira/browse/FLUME-2741
Project: Flume
Issue Type: Bug
Components: Sinks+Sources
Affects Versions: v1.6.0, v1.5.2
Reporter: liu xiaofei
while flume hdfs sink write data to hdfs ,the hdfs cluster come unkown error
and namenode could not offer a healthy node, the BucketWriter will not close。
timedRollerPool will Interrupt because of close(true) has error! Code is:
if (rollInterval > 0) {
Callable<Void> action = new Callable<Void>() {
public Void call() throws Exception {
LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
bucketPath, rollInterval);
try {
// Roll the file and remove reference from sfWriters map.
close(true);
} catch(Throwable t) {
LOG.error("Unexpected error", t);
}
return null;
}
};
timedRollFuture = timedRollerPool.schedule(action, rollInterval,
TimeUnit.SECONDS);
}
And in my hdfs sink conf
,hdfs.rollSize=0,hdfs.rollCount=0,hdfs.rollInterval=1200,and directory like
YYYY/MM/DD, so only depend on rollInterval time to roll hdfs file, my channel
is memory channel, if colse fail, HDFSEventSink‘s sfWriters will not remove the
error file, so hdfs sink will not work ,can't consume data in channel, if
channel capacity is small or data more ,
maybe :1、channel will be full come soon ;2、 hdfs sink always process the data
correspond the error file,because the data's timestamp is belong to the error
file...
so , i think while close file has exception remove the writer in sfWriters like
:
if (rollInterval > 0) {
Callable<Void> action = new Callable<Void>() {
public Void call() throws Exception {
LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
bucketPath, rollInterval);
try {
// Roll the file and remove reference from sfWriters map.
close(true);
} catch(Throwable t) {
LOG.error("Unexpected error", t);
runCloseAction();
closed = true;
}
return null;
}
};
timedRollFuture = timedRollerPool.schedule(action, rollInterval,
TimeUnit.SECONDS);
}
OK ?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)