westonpace commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r966382942
##########
cpp/src/arrow/util/async_generator.h:
##########
@@ -147,6 +147,47 @@ Future<std::vector<T>>
CollectAsyncGenerator(AsyncGenerator<T> generator) {
return Loop(LoopBody{std::move(generator), std::move(vec)});
}
+/// \brief this is just like a MapGenerator but the map fun returns a thing
instead of a
+/// future. Then we will launch each map fun as an independent task, instead
of piggy
+/// backing it to the future from the source.
Review Comment:
```suggestion
/// \brief Similar to MapGenerator but applies the map function in a new
thread task
/// This is similar to combining a map generator and transfer generator but
the former
/// would not be able to guarantee the map task runs on a new thread.
```
##########
cpp/src/arrow/dataset/file_csv.cc:
##########
@@ -184,27 +185,60 @@ static inline
Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
auto span =
tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
#endif
ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format,
scan_options));
-
- ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
const auto& path = source.path();
- ARROW_ASSIGN_OR_RAISE(
- input, io::BufferedInputStream::Create(reader_options.block_size,
- default_memory_pool(),
std::move(input)));
+
+ auto actual_compression = Compression::type::UNCOMPRESSED;
+ // Guess compression from file extension
+ auto extension = fs::internal::GetAbstractPathExtension(path);
+ if (extension == "gz") {
+ actual_compression = Compression::type::GZIP;
+ } else {
+ auto maybe_compression = util::Codec::GetCompressionType(extension);
+ if (maybe_compression.ok()) {
+ ARROW_ASSIGN_OR_RAISE(actual_compression, maybe_compression);
+ }
+ }
+
+ Future<std::shared_ptr<csv::StreamingReader>> reader_fut;
+
+ if (actual_compression == Compression::type::UNCOMPRESSED) {
Review Comment:
```suggestion
// If the file is uncompressed we open the reader with a RandomAccessFile
which will
// be capable of reading the file in parallel. If the file is compressed
we must use an
// input stream and will be read sequentially.
if (actual_compression == Compression::type::UNCOMPRESSED) {
```
##########
cpp/src/arrow/io/interfaces.cc:
##########
@@ -138,6 +136,44 @@ Result<Iterator<std::shared_ptr<Buffer>>>
MakeInputStreamIterator(
return Iterator<std::shared_ptr<Buffer>>(InputStreamBlockIterator(stream,
block_size));
}
+// this is async re-entrant and can be used with a Readahead Generator
+// perhaps you should only use it with a Readahead Generator, otherwise there
+// is no point in using this, just use MakeInputStreamIterator instead.
+Result<AsyncGenerator<std::shared_ptr<Buffer>>> MakeRandomAccessFileGenerator(
+ std::shared_ptr<RandomAccessFile> file, int64_t block_size) {
+ struct State {
+ explicit State(std::shared_ptr<RandomAccessFile> file_, int64_t
block_size_)
+ : file(std::move(file_)), block_size(block_size_), position(0) {}
+
+ Status Init() {
+ RETURN_NOT_OK(file->Seek(0));
+ ARROW_ASSIGN_OR_RAISE(total_size, file->GetSize());
+ return Status::OK();
+ }
+
+ std::shared_ptr<RandomAccessFile> file;
+ int64_t block_size;
+ int64_t total_size;
+ std::atomic<int64_t> position;
+ };
+
+ auto state = std::make_shared<State>(std::move(file), block_size);
+ RETURN_NOT_OK(state->Init());
+ return [state]() {
+ auto pos = state->position.fetch_add(state->block_size);
+ if (pos >= state->total_size) {
+ return AsyncGeneratorEnd<std::shared_ptr<Buffer>>();
+ }
+ // idx is guaranteed to be smaller than total size, but you might not be
able to read
+ // a full block
Review Comment:
```suggestion
// pos is guaranteed to be smaller than total size, but you might not be
able to read
// a full block
```
##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +972,148 @@ class StreamingReaderImpl : public ReaderMixin,
std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
};
+/////////////////////////////////////////////////////////////////////////
+// Base class for streaming readers
+
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]