[
https://issues.apache.org/jira/browse/FLUME-1289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13396605#comment-13396605
]
Inder SIngh commented on FLUME-1289:
------------------------------------
@Mike
If we catch inappropriate config(i.e. hdfs.path during validateSinks() in
FlumeConfiguration) then we can fail early without starting incorrectly
configured SINKS.
Without the support of reloading config at run time we anyways expect the user
to restart the service after fixing the config.
I wanted to support this much earlier i.e. in
FlumeConfiguration -> private Set<String> validateSinks(Set<String> channelSet)
here we already know the SinkType so the suggestion was something like ->
SinkType sinkType = getKnownSink(sinkContext.getString(
BasicConfigurationConstants.CONFIG_TYPE));
if (sinkType.toString().equals(SinkType.HDFS.toString())) {
validateHDFSSink();
}
and validateHDFSSink() can act as a place holder for all
1. check correct NN.
2. check path permissions.
3. ...anything else...
> HDFSEventSink doesn't log exception when config points to incorrect NN
> hostname
> -------------------------------------------------------------------------------
>
> Key: FLUME-1289
> URL: https://issues.apache.org/jira/browse/FLUME-1289
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Reporter: Mubarak Seyed
>
> It appears from test that if hdfs sink path points to incorrect {{NameNode}}
> hostname, there is no exception in log when {{BucketWriter}} append happens,
> there is no easy way to debug and correct the config file for hdfs path in
> hdfs sink.
> Looks like exception is eaten somewhere.
> Do we need to catch Exception here?
> {code}
> private static <T> T callWithTimeout(final ExecutorService executor,
> long timeout, final Callable<T> callable)
> throws IOException, InterruptedException {
> Future<T> future = executor.submit(callable);
> try {
> if (timeout > 0) {
> return future.get(timeout, TimeUnit.MILLISECONDS);
> } else {
> return future.get();
> }
> } catch (TimeoutException eT) {
> future.cancel(true);
> throw new IOException("Callable timed out", eT);
> } catch (ExecutionException e1) {
> Throwable cause = e1.getCause();
> if (cause instanceof IOException) {
> throw (IOException) cause;
> } else if (cause instanceof InterruptedException) {
> throw (InterruptedException) cause;
> } else if (cause instanceof RuntimeException) {
> throw (RuntimeException) cause;
> } else if (cause instanceof Error) {
> throw (Error)cause;
> } else {
> throw new RuntimeException(e1);
> }
> } catch (CancellationException ce) {
> throw new InterruptedException(
> "Blocked callable interrupted by rotation event");
> } catch (InterruptedException ex) {
> LOG.warn("Unexpected Exception " + ex.getMessage(), ex);
> throw ex;
> }
> }
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira