[ 
https://issues.apache.org/jira/browse/FLUME-2360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13965526#comment-13965526
 ] 

Keith Wright commented on FLUME-2360:
-------------------------------------

FYI as a workaround for now I have implemented my own line deserializer which 
uses standard BufferReader.readLine.  It is technically not durable but since 
our channel is a file channel (with a VERY high capacity) I have never actually 
had issues with the source adding events to it.

public static class MyLineDeserializer implements EventDeserializer.Builder {

        @Override
        public EventDeserializer build(Context context, ResettableInputStream 
in) {
            try {
                Field fileField = 
ResettableFileInputStream.class.getDeclaredField("file");
                fileField.setAccessible(true);
                in.close();
                return new MyLineReader((File)fileField.get(in));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        static class MyLineReader implements EventDeserializer {

            final FileInputStream fileInputStream;
            final InputStreamReader inputStreamReader;
            final BufferedReader reader;

            MyLineReader(File file) throws Exception {
                fileInputStream = new FileInputStream(file);
                inputStreamReader = new InputStreamReader(fileInputStream);
                reader = new BufferedReader(inputStreamReader);
            }

            @Override
            public Event readEvent() throws IOException {
                List<Event> eventList = readEvents(1);
                return eventList.isEmpty() ? null : eventList.iterator().next();
            }

            @Override
            public List<Event> readEvents(int numEvents) throws IOException {
                int seenEvents = 0;
                String line;
                List<Event> eventList = new LinkedList<Event>();
                while ((line = reader.readLine()) != null && seenEvents++ < 
numEvents) {
                    eventList.add(EventBuilder.withBody(line, Charsets.UTF_8));
                }
                return eventList;
            }

            @Override
            public void mark() throws IOException {

            }

            @Override
            public void reset() throws IOException {
                // do nothing
            }

            @Override
            public void close() throws IOException {
                fileInputStream.close();
                inputStreamReader.close();
                reader.close();
            }
        }

> Spooling Source: LineDeserializer incorrectly stops reading events if UTF-8 
> char occurs at end of buffer
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-2360
>                 URL: https://issues.apache.org/jira/browse/FLUME-2360
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.4.0
>         Environment: CDH 4.4
>            Reporter: Keith Wright
>         Attachments: FBX_BID.80.log.2014-04-10-00-47.log
>
>
> We are using a Spooling source to read from files and have noticed that there 
> are situations where the source will stop reading in the middle of a file 
> (see attached example file).  After much debugging, I have determine that 
> this occurs if the buffer in ResettableFileInputStream happens to fill when 
> in the middle of handling a UTF-8 character.  In this case readChar() will 
> return -1 which ends the read lines loop in LineSerializer.  Running the 
> spool source with the attached file should result in > 5400 events in the 
> channel but now it will only return ~3300.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to