josephglanville edited a comment on issue #5584: Decoupling FirehoseFactory and 
InputRowParser
URL: 
https://github.com/apache/incubator-druid/issues/5584#issuecomment-423473524
 
 
   What I came up with was this:
   
   Firstly a new interface to handle reading a file and emitting rows that can 
be passed to parsers:
   ```java
   public interface FirehoseReader<T>
   {
        boolean hasNext();
        T next();
        void close();
   }
   ```
   For our case we implement a `FirehoseReader` that reads sequence files and 
emits `ByteBuffer` so that any `ByteBufferInputRowParser` implementations can 
be used with it:
   ```java
   public class SequenceFileFirehoseReader implements 
FirehoseReader<ByteBuffer> {
        private Reader reader;
        private Closeable closer;
   
        public SequenceFileFirehoseReader(InputStream inputStream) throws 
IOException {
                this(inputStream, null);
        }
   
        public SequenceFileFirehoseReader(InputStream inputStream, Closeable 
closer) throws IOException {
                Configuration config = new Configuration();
                config.set("io.compression.codecs", 
"io.airlift.compress.snappy.SnappyCodec");
                FSDataInputStream fsDataInputStream = new 
FSDataInputStream(inputStream);
                Option isOption = Reader.stream(fsDataInputStream);
                this.reader = new Reader(config, isOption);
                this.closer = closer;
        }
   
        @Override
        public boolean hasNext() {
                try {
                        Writable ignoreKey = (Writable) 
reader.getKeyClass().newInstance();
                        return reader.next(ignoreKey);
                } catch (IOException | InstantiationException | 
IllegalAccessException e) {
                        return false;
                }
        }
   
        @Override
        public ByteBuffer next() {
                BytesWritable value = new BytesWritable();
                try {
                        reader.getCurrentValue(value);
                } catch (IOException ignored) {
                        return null;
                }
                return ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
        }
   
        @Override
        public void close() {
                try {
                        reader.close();
                } catch (IOException ignored) {
                }
                try {
                        closer.close();
                } catch (IOException ignored) {
                }
        }
   }
   ```
   
   This is then coupled with `ReaderIteratingFirehose` which can be returned 
from `FirehoseFactory.connect()`
   ```java
   public class ReaderIteratingFirehose<T> implements Firehose {
        private final Iterator<FirehoseReader<T>> readers;
        private final Closeable closer;
        private FirehoseReader<T> reader = null;
        private InputRowParser<T> parser;
        private Iterator<InputRow> rowIterator;
   
        public ReaderIteratingFirehose(Iterator<FirehoseReader<T>> readers, 
InputRowParser<T> parser) {
                this(readers, parser, null);
        }
   
        public ReaderIteratingFirehose(
                Iterator<FirehoseReader<T>> readers,
                InputRowParser<T> parser,
                Closeable closer
        ) {
                this.readers = readers;
                this.parser = parser;
                this.closer = closer;
        }
   
        @Override
        public boolean hasMore() {
                if (rowIterator != null && rowIterator.hasNext()) {
                        return true;
                }
                while ((reader == null || !reader.hasNext()) && 
readers.hasNext()) {
                        reader = getNextReader();
                }
   
                return reader != null && reader.hasNext();
        }
   
        @Nullable
        @Override
        public InputRow nextRow() {
                if (!hasMore()) {
                        throw new NoSuchElementException();
                }
   
                // If we don't have any more parsed rows available parse a new 
batch
                if (rowIterator == null || !rowIterator.hasNext()) {
                        rowIterator = 
parser.parseBatch(reader.next()).iterator();
                }
   
                return rowIterator.next();
        }
   
        private FirehoseReader<T> getNextReader() {
                if (reader != null) {
                        reader.close();
                }
   
                return readers.next();
        }
   
        @Override
        public Runnable commit() {
                return Runnables.getNoopRunnable();
        }
   
        @Override
        public void close() throws IOException {
                try {
                        if (reader != null) {
                                reader.close();
                        }
                } catch (Throwable t) {
                        try {
                                if (closer != null) {
                                        closer.close();
                                }
                        } catch (Exception e) {
                                t.addSuppressed(e);
                        }
                        throw t;
                }
                if (closer != null) {
                        closer.close();
                }
        }
   }
   ```
   For example to use it with the `SequenceFileFirehoseReader` above:
   ```java
                return ReaderIteratingFirehose<ByteBuffer>(
                        object : Iterator<SequenceFileFirehoseReader> {
                                override fun hasNext(): Boolean {
                                        return fetcher.hasNext()
                                }
   
                                override fun next(): SequenceFileFirehoseReader 
{
                                        if (!hasNext()) {
                                                throw NoSuchElementException()
                                        }
   
                                        val openedObject = fetcher.next()
                                        val stream: InputStream
                                        try {
                                                stream = wrapObjectStream(
                                                        openedObject,
                                                        
openObjectStream(openedObject)
                                                )
                                        } catch (e: IOException) {
                                                throw RuntimeException(e)
                                        }
   
                                        return 
SequenceFileFirehoseReader(stream)
                                }
                        },
                        parser
                )
   ```
   
   There is still a lot of work to be done though. Beyond fixing up 
`StringInputRowParser` the prefetching and file caching firehose stuff would 
need to be refactored etc.
   
   I *should* have some more bandwith for this soon and any mentoring/advice on 
how to deal with the `StringInputRowParser` situation would be greatly 
appreciated as we would like to get this work upstream.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to