What I came up with was this:
Firstly a new interface to handle reading a file and emitting rows that can be
passed to parsers:
```
public interface FirehoseReader<T>
{
boolean hasNext();
T next();
void close();
}
```
For our case we implement a `FirehoseReader` that reads sequence files and
emits `ByteBuffer` so that any `ByteBufferInputRowParser` implementations can
be used with it:
```
public class SequenceFileFirehoseReader implements FirehoseReader<ByteBuffer> {
private Reader reader;
private Closeable closer;
public SequenceFileFirehoseReader(InputStream inputStream) throws
IOException {
this(inputStream, null);
}
public SequenceFileFirehoseReader(InputStream inputStream, Closeable
closer) throws IOException {
Configuration config = new Configuration();
config.set("io.compression.codecs",
"io.airlift.compress.snappy.SnappyCodec");
FSDataInputStream fsDataInputStream = new
FSDataInputStream(inputStream);
Option isOption = Reader.stream(fsDataInputStream);
this.reader = new Reader(config, isOption);
this.closer = closer;
}
@Override
public boolean hasNext() {
try {
Writable ignoreKey = (Writable)
reader.getKeyClass().newInstance();
return reader.next(ignoreKey);
} catch (IOException | InstantiationException |
IllegalAccessException e) {
return false;
}
}
@Override
public ByteBuffer next() {
BytesWritable value = new BytesWritable();
try {
reader.getCurrentValue(value);
} catch (IOException ignored) {
return null;
}
return ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
}
@Override
public void close() {
try {
reader.close();
} catch (IOException ignored) {
}
try {
closer.close();
} catch (IOException ignored) {
}
}
}
```
This is then coupled with `ReaderIteratingFirehose` which can be returned from
`FirehoseFactory.connect()`
```
public class ReaderIteratingFirehose<T> implements Firehose {
private final Iterator<FirehoseReader<T>> readers;
private final Closeable closer;
private FirehoseReader<T> reader = null;
private InputRowParser<T> parser;
private Iterator<InputRow> rowIterator;
public ReaderIteratingFirehose(Iterator<FirehoseReader<T>> readers,
InputRowParser<T> parser) {
this(readers, parser, null);
}
public ReaderIteratingFirehose(
Iterator<FirehoseReader<T>> readers,
InputRowParser<T> parser,
Closeable closer
) {
this.readers = readers;
this.parser = parser;
this.closer = closer;
}
@Override
public boolean hasMore() {
if (rowIterator != null && rowIterator.hasNext()) {
return true;
}
while ((reader == null || !reader.hasNext()) &&
readers.hasNext()) {
reader = getNextReader();
}
return reader != null && reader.hasNext();
}
@Nullable
@Override
public InputRow nextRow() {
if (!hasMore()) {
throw new NoSuchElementException();
}
// If we don't have any more parsed rows available parse a new
batch
if (rowIterator == null || !rowIterator.hasNext()) {
rowIterator =
parser.parseBatch(reader.next()).iterator();
}
return rowIterator.next();
}
private FirehoseReader<T> getNextReader() {
if (reader != null) {
reader.close();
}
return readers.next();
}
@Override
public Runnable commit() {
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException {
try {
if (reader != null) {
reader.close();
}
} catch (Throwable t) {
try {
if (closer != null) {
closer.close();
}
} catch (Exception e) {
t.addSuppressed(e);
}
throw t;
}
if (closer != null) {
closer.close();
}
}
}
```
For example to use it with the `SequenceFileFirehoseReader` above:
```
return ReaderIteratingFirehose<ByteBuffer>(
object : Iterator<SequenceFileFirehoseReader> {
override fun hasNext(): Boolean {
return fetcher.hasNext()
}
override fun next(): SequenceFileFirehoseReader
{
if (!hasNext()) {
throw NoSuchElementException()
}
val openedObject = fetcher.next()
val stream: InputStream
try {
stream = wrapObjectStream(
openedObject,
openObjectStream(openedObject)
)
} catch (e: IOException) {
throw RuntimeException(e)
}
return
SequenceFileFirehoseReader(stream)
}
},
parser
)
```
There is still a lot of work to be done though. Beyond fixing up
`StringInputRowParser` the prefetching and file caching firehose stuff would
need to be refactored etc.
I *should* have some more bandwith for this soon and any mentoring/advice on
how to deal with the `StringInputRowParser` situation would be greatly
appreciated as we would like to get this work upstream.
[ Full content available at:
https://github.com/apache/incubator-druid/issues/5584 ]
This message was relayed via gitbox.apache.org for [email protected]