zhztheplayer commented on code in PR #14151:
URL: https://github.com/apache/arrow/pull/14151#discussion_r972755514
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -170,6 +177,116 @@ class DisposableScannerAdaptor {
}
};
+/// \brief Simple fragment implementation that is constructed directly
+/// from a record batch iterator.
+class SimpleIteratorFragment : public arrow::dataset::Fragment {
+ public:
+ explicit SimpleIteratorFragment(arrow::RecordBatchIterator itr)
+ : arrow::dataset::Fragment() {
+ itr_ = std::move(itr);
+ }
+
+ static arrow::Result<std::shared_ptr<SimpleIteratorFragment>> Make(
+ arrow::RecordBatchIterator itr) {
+ return std::make_shared<SimpleIteratorFragment>(std::move(itr));
+ }
+
+ arrow::Result<arrow::RecordBatchGenerator> ScanBatchesAsync(
+ const std::shared_ptr<arrow::dataset::ScanOptions>& options) override {
+ struct State {
+ State(std::shared_ptr<SimpleIteratorFragment> fragment)
+ : fragment(std::move(fragment)) {}
+
+ std::shared_ptr<arrow::RecordBatch> Next() { return cur_rb; }
+
+ bool Finished() {
+ arrow::Result<std::shared_ptr<arrow::RecordBatch>> next =
fragment->itr_.Next();
+ if (IsIterationEnd(next)) {
+ cur_rb = nullptr;
+
+ return true;
+ } else {
+ cur_rb = next.ValueOrDie();
+ return false;
+ }
+ }
+
+ std::shared_ptr<SimpleIteratorFragment> fragment;
+ std::shared_ptr<arrow::RecordBatch> cur_rb = nullptr;
+ };
+
+ struct Generator {
+ Generator(std::shared_ptr<SimpleIteratorFragment> fragment)
+ : state(std::make_shared<State>(std::move(fragment))) {}
+
+ arrow::Future<std::shared_ptr<arrow::RecordBatch>> operator()() {
+ while (!state->Finished()) {
+ auto next = state->Next();
+ if (next) {
+ return
arrow::Future<std::shared_ptr<arrow::RecordBatch>>::MakeFinished(
+ std::move(next));
+ }
+ }
+ return arrow::AsyncGeneratorEnd<std::shared_ptr<arrow::RecordBatch>>();
+ }
+
+ std::shared_ptr<State> state;
+ };
+ return
Generator(arrow::internal::checked_pointer_cast<SimpleIteratorFragment>(
+ shared_from_this()));
+ }
+
+ std::string type_name() const override { return "simple_iterator"; }
+
+ arrow::Result<std::shared_ptr<arrow::Schema>> ReadPhysicalSchemaImpl()
override {
+ return arrow::Status::NotImplemented("No physical schema is readable");
+ }
+
+ private:
+ arrow::RecordBatchIterator itr_;
+ bool used_ = false;
+};
+
+/// \brief Create scanner that scans over Java dataset API's components.
+///
+/// Currently, we use a NativeRecordBatchIterator as the underlying
+/// Java object to do scanning. Which means, only one single task will
+/// be produced from C++ code.
+arrow::Result<std::shared_ptr<arrow::dataset::Scanner>> MakeJavaDatasetScanner(
+ JavaVM* vm, jobject iter, std::shared_ptr<arrow::Schema> schema) {
+ arrow::RecordBatchIterator itr = arrow::MakeFunctionIterator(
Review Comment:
There might be some better solution to distinguish on the 2 variables than
just using `iter` and `itr`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]