Updated Branches: refs/heads/trunk 822e120ae -> d20c94ca6
FLUME-1676: ExecSource should provide a configurable charset (Nitin Verma via Brock Noland Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d20c94ca Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d20c94ca Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d20c94ca Branch: refs/heads/trunk Commit: d20c94ca61103632de2cd941a716dbd4d9c6d719 Parents: 822e120 Author: Brock Noland <[email protected]> Authored: Thu Dec 13 13:37:32 2012 -0600 Committer: Brock Noland <[email protected]> Committed: Thu Dec 13 13:37:32 2012 -0600 ---------------------------------------------------------------------- .../java/org/apache/flume/source/ExecSource.java | 20 +++++++++++--- .../source/ExecSourceConfigurationConstants.java | 8 ++++++ 2 files changed, 23 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d20c94ca/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java index 46f672f..495b03f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java @@ -42,6 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import java.nio.charset.Charset; /** * <p> @@ -149,6 +150,7 @@ Configurable { private boolean logStderr; private Integer bufferCount; private ExecRunnable runner; + private Charset charset; @Override public void start() { @@ -158,7 +160,7 @@ Configurable { counterGroup = new CounterGroup(); runner = new ExecRunnable(command, getChannelProcessor(), counterGroup, - restart, restartThrottle, logStderr, bufferCount); + restart, restartThrottle, logStderr, bufferCount, charset); // FIXME: Use a callback-like executor / future to signal us upon failure. runnerFuture = executor.submit(runner); @@ -224,13 +226,16 @@ Configurable { bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE, ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE); + + charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET, + ExecSourceConfigurationConstants.DEFAULT_CHARSET)); } private static class ExecRunnable implements Runnable { public ExecRunnable(String command, ChannelProcessor channelProcessor, CounterGroup counterGroup, boolean restart, long restartThrottle, - boolean logStderr, int bufferCount) { + boolean logStderr, int bufferCount, Charset charset) { this.command = command; this.channelProcessor = channelProcessor; this.counterGroup = counterGroup; @@ -238,6 +243,7 @@ Configurable { this.bufferCount = bufferCount; this.restart = restart; this.logStderr = logStderr; + this.charset = charset; } private String command; @@ -247,6 +253,7 @@ Configurable { private long restartThrottle; private int bufferCount; private boolean logStderr; + private Charset charset; private Process process = null; @Override @@ -258,11 +265,11 @@ Configurable { String[] commandArgs = command.split("\\s+"); process = new ProcessBuilder(commandArgs).start(); reader = new BufferedReader( - new InputStreamReader(process.getInputStream())); + new InputStreamReader(process.getInputStream(), charset)); // StderrLogger dies as soon as the input stream is invalid StderrReader stderrReader = new StderrReader(new BufferedReader( - new InputStreamReader(process.getErrorStream())), logStderr); + new InputStreamReader(process.getErrorStream(), charset)), logStderr); stderrReader.setName("StderrReader-[" + command + "]"); stderrReader.setDaemon(true); stderrReader.start(); @@ -271,7 +278,7 @@ Configurable { List<Event> eventList = new ArrayList<Event>(); while ((line = reader.readLine()) != null) { counterGroup.incrementAndGet("exec.lines.read"); - eventList.add(EventBuilder.withBody(line.getBytes())); + eventList.add(EventBuilder.withBody(line.getBytes(charset))); if(eventList.size() >= bufferCount) { channelProcessor.processEventBatch(eventList); eventList.clear(); @@ -340,6 +347,9 @@ Configurable { String line = null; while((line = input.readLine()) != null) { if(logStderr) { + // There is no need to read 'line' with a charset + // as we do not to propagate it. + // It is in UTF-16 and would be printed in UTF-8 format. logger.info("StderrLogger[{}] = '{}'", ++i, line); } } http://git-wip-us.apache.org/repos/asf/flume/blob/d20c94ca/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java index 0ba0508..1b35b01 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java @@ -18,6 +18,8 @@ */ package org.apache.flume.source; +import java.nio.charset.Charset; + public class ExecSourceConfigurationConstants { /** @@ -43,4 +45,10 @@ public class ExecSourceConfigurationConstants { */ public static final String CONFIG_BATCH_SIZE = "batchSize"; public static final int DEFAULT_BATCH_SIZE = 20; + + /** + * Charset for reading input + */ + public static final String CHARSET = "charset"; + public static final String DEFAULT_CHARSET = "UTF-8"; }
