josephglanville edited a comment on issue #5584: Decoupling FirehoseFactory and InputRowParser URL: https://github.com/apache/incubator-druid/issues/5584#issuecomment-423473524 What I came up with was this: Firstly a new interface to handle reading a file and emitting rows that can be passed to parsers: ```java public interface FirehoseReader<T> { boolean hasNext(); T next(); void close(); } ``` For our case we implement a `FirehoseReader` that reads sequence files and emits `ByteBuffer` so that any `ByteBufferInputRowParser` implementations can be used with it: ```java public class SequenceFileFirehoseReader implements FirehoseReader<ByteBuffer> { private Reader reader; private Closeable closer; public SequenceFileFirehoseReader(InputStream inputStream) throws IOException { this(inputStream, null); } public SequenceFileFirehoseReader(InputStream inputStream, Closeable closer) throws IOException { Configuration config = new Configuration(); config.set("io.compression.codecs", "io.airlift.compress.snappy.SnappyCodec"); FSDataInputStream fsDataInputStream = new FSDataInputStream(inputStream); Option isOption = Reader.stream(fsDataInputStream); this.reader = new Reader(config, isOption); this.closer = closer; } @Override public boolean hasNext() { try { Writable ignoreKey = (Writable) reader.getKeyClass().newInstance(); return reader.next(ignoreKey); } catch (IOException | InstantiationException | IllegalAccessException e) { return false; } } @Override public ByteBuffer next() { BytesWritable value = new BytesWritable(); try { reader.getCurrentValue(value); } catch (IOException ignored) { return null; } return ByteBuffer.wrap(value.getBytes(), 0, value.getLength()); } @Override public void close() { try { reader.close(); } catch (IOException ignored) { } try { closer.close(); } catch (IOException ignored) { } } } ``` This is then coupled with `ReaderIteratingFirehose` which can be returned from `FirehoseFactory.connect()` ```java public class ReaderIteratingFirehose<T> implements Firehose { private final Iterator<FirehoseReader<T>> readers; private final Closeable closer; private FirehoseReader<T> reader = null; private InputRowParser<T> parser; private Iterator<InputRow> rowIterator; public ReaderIteratingFirehose(Iterator<FirehoseReader<T>> readers, InputRowParser<T> parser) { this(readers, parser, null); } public ReaderIteratingFirehose( Iterator<FirehoseReader<T>> readers, InputRowParser<T> parser, Closeable closer ) { this.readers = readers; this.parser = parser; this.closer = closer; } @Override public boolean hasMore() { if (rowIterator != null && rowIterator.hasNext()) { return true; } while ((reader == null || !reader.hasNext()) && readers.hasNext()) { reader = getNextReader(); } return reader != null && reader.hasNext(); } @Nullable @Override public InputRow nextRow() { if (!hasMore()) { throw new NoSuchElementException(); } // If we don't have any more parsed rows available parse a new batch if (rowIterator == null || !rowIterator.hasNext()) { rowIterator = parser.parseBatch(reader.next()).iterator(); } return rowIterator.next(); } private FirehoseReader<T> getNextReader() { if (reader != null) { reader.close(); } return readers.next(); } @Override public Runnable commit() { return Runnables.getNoopRunnable(); } @Override public void close() throws IOException { try { if (reader != null) { reader.close(); } } catch (Throwable t) { try { if (closer != null) { closer.close(); } } catch (Exception e) { t.addSuppressed(e); } throw t; } if (closer != null) { closer.close(); } } } ``` For example to use it with the `SequenceFileFirehoseReader` above: ```java return ReaderIteratingFirehose<ByteBuffer>( object : Iterator<SequenceFileFirehoseReader> { override fun hasNext(): Boolean { return fetcher.hasNext() } override fun next(): SequenceFileFirehoseReader { if (!hasNext()) { throw NoSuchElementException() } val openedObject = fetcher.next() val stream: InputStream try { stream = wrapObjectStream( openedObject, openObjectStream(openedObject) ) } catch (e: IOException) { throw RuntimeException(e) } return SequenceFileFirehoseReader(stream) } }, parser ) ``` There is still a lot of work to be done though. Beyond fixing up `StringInputRowParser` the prefetching and file caching firehose stuff would need to be refactored etc. I *should* have some more bandwith for this soon and any mentoring/advice on how to deal with the `StringInputRowParser` situation would be greatly appreciated as we would like to get this work upstream.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org