What I came up with was this:

Firstly a new interface to handle reading a file and emitting rows that can be 
passed to parsers:
```
public interface FirehoseReader<T>
{
        boolean hasNext();
        T next();
        void close();
}
```
For our case we implement a `FirehoseReader` that reads sequence files and 
emits `ByteBuffer` so that any `ByteBufferInputRowParser` implementations can 
be used with it:
```
public class SequenceFileFirehoseReader implements FirehoseReader<ByteBuffer> {
        private Reader reader;
        private Closeable closer;

        public SequenceFileFirehoseReader(InputStream inputStream) throws 
IOException {
                this(inputStream, null);
        }

        public SequenceFileFirehoseReader(InputStream inputStream, Closeable 
closer) throws IOException {
                Configuration config = new Configuration();
                config.set("io.compression.codecs", 
"io.airlift.compress.snappy.SnappyCodec");
                FSDataInputStream fsDataInputStream = new 
FSDataInputStream(inputStream);
                Option isOption = Reader.stream(fsDataInputStream);
                this.reader = new Reader(config, isOption);
                this.closer = closer;
        }

        @Override
        public boolean hasNext() {
                try {
                        Writable ignoreKey = (Writable) 
reader.getKeyClass().newInstance();
                        return reader.next(ignoreKey);
                } catch (IOException | InstantiationException | 
IllegalAccessException e) {
                        return false;
                }
        }

        @Override
        public ByteBuffer next() {
                BytesWritable value = new BytesWritable();
                try {
                        reader.getCurrentValue(value);
                } catch (IOException ignored) {
                        return null;
                }
                return ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
        }

        @Override
        public void close() {
                try {
                        reader.close();
                } catch (IOException ignored) {
                }
                try {
                        closer.close();
                } catch (IOException ignored) {
                }
        }
}
```

This is then coupled with `ReaderIteratingFirehose` which can be returned from 
`FirehoseFactory.connect()`
```
public class ReaderIteratingFirehose<T> implements Firehose {
        private final Iterator<FirehoseReader<T>> readers;
        private final Closeable closer;
        private FirehoseReader<T> reader = null;
        private InputRowParser<T> parser;
        private Iterator<InputRow> rowIterator;

        public ReaderIteratingFirehose(Iterator<FirehoseReader<T>> readers, 
InputRowParser<T> parser) {
                this(readers, parser, null);
        }

        public ReaderIteratingFirehose(
                Iterator<FirehoseReader<T>> readers,
                InputRowParser<T> parser,
                Closeable closer
        ) {
                this.readers = readers;
                this.parser = parser;
                this.closer = closer;
        }

        @Override
        public boolean hasMore() {
                if (rowIterator != null && rowIterator.hasNext()) {
                        return true;
                }
                while ((reader == null || !reader.hasNext()) && 
readers.hasNext()) {
                        reader = getNextReader();
                }

                return reader != null && reader.hasNext();
        }

        @Nullable
        @Override
        public InputRow nextRow() {
                if (!hasMore()) {
                        throw new NoSuchElementException();
                }

                // If we don't have any more parsed rows available parse a new 
batch
                if (rowIterator == null || !rowIterator.hasNext()) {
                        rowIterator = 
parser.parseBatch(reader.next()).iterator();
                }

                return rowIterator.next();
        }

        private FirehoseReader<T> getNextReader() {
                if (reader != null) {
                        reader.close();
                }

                return readers.next();
        }

        @Override
        public Runnable commit() {
                return Runnables.getNoopRunnable();
        }

        @Override
        public void close() throws IOException {
                try {
                        if (reader != null) {
                                reader.close();
                        }
                } catch (Throwable t) {
                        try {
                                if (closer != null) {
                                        closer.close();
                                }
                        } catch (Exception e) {
                                t.addSuppressed(e);
                        }
                        throw t;
                }
                if (closer != null) {
                        closer.close();
                }
        }
}
```
For example to use it with the `SequenceFileFirehoseReader` above:
```
                return ReaderIteratingFirehose<ByteBuffer>(
                        object : Iterator<SequenceFileFirehoseReader> {
                                override fun hasNext(): Boolean {
                                        return fetcher.hasNext()
                                }

                                override fun next(): SequenceFileFirehoseReader 
{
                                        if (!hasNext()) {
                                                throw NoSuchElementException()
                                        }

                                        val openedObject = fetcher.next()
                                        val stream: InputStream
                                        try {
                                                stream = wrapObjectStream(
                                                        openedObject,
                                                        
openObjectStream(openedObject)
                                                )
                                        } catch (e: IOException) {
                                                throw RuntimeException(e)
                                        }

                                        return 
SequenceFileFirehoseReader(stream)
                                }
                        },
                        parser
                )
```

There is still a lot of work to be done though. Beyond fixing up 
`StringInputRowParser` the prefetching and file caching firehose stuff would 
need to be refactored etc.

I *should* have some more bandwith for this soon and any mentoring/advice on 
how to deal with the `StringInputRowParser` situation would be greatly 
appreciated as we would like to get this work upstream.

[ Full content available at: 
https://github.com/apache/incubator-druid/issues/5584 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to