mapleFU commented on code in PR #43661:
URL: https://github.com/apache/arrow/pull/43661#discussion_r1730212079


##########
cpp/src/arrow/dataset/file_parquet.cc:
##########
@@ -555,6 +562,66 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReader
       });
 }
 
+struct CastingGenerator {
+  CastingGenerator(RecordBatchGenerator source, std::shared_ptr<Schema> 
final_schema,
+                   const std::unordered_set<std::string>& cols_to_skip,
+                   MemoryPool* pool = default_memory_pool())
+      : source_(source),
+        final_schema_(final_schema),

Review Comment:
   ```suggestion
           final_schema_(std::move(final_schema)),
   ```



##########
cpp/src/arrow/dataset/file_parquet.cc:
##########
@@ -617,6 +684,9 @@ Result<RecordBatchGenerator> 
ParquetFileFormat::ScanBatchesAsync(
       [this, options, parquet_fragment, pre_filtered,
        row_groups](const std::shared_ptr<parquet::arrow::FileReader>& reader) 
mutable
       -> Result<RecordBatchGenerator> {
+    // Since we already do the batching through the SlicingGenerator, we don't 
need the
+    // reader to batch its output.
+    reader->set_batch_size(std::numeric_limits<int64_t>::max());
     // Ensure that parquet_fragment has FileMetaData

Review Comment:
   Assuming a large file, would memory usage grows high in this case?



##########
cpp/src/arrow/dataset/file_parquet.cc:
##########
@@ -555,6 +562,66 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReader
       });
 }
 
+struct CastingGenerator {
+  CastingGenerator(RecordBatchGenerator source, std::shared_ptr<Schema> 
final_schema,
+                   const std::unordered_set<std::string>& cols_to_skip,
+                   MemoryPool* pool = default_memory_pool())
+      : source_(source),
+        final_schema_(final_schema),
+        cols_to_skip_(cols_to_skip),
+        exec_ctx(std::make_shared<compute::ExecContext>(pool)) {}
+
+  Future<std::shared_ptr<RecordBatch>> operator()() {
+    return this->source_().Then([this](const std::shared_ptr<RecordBatch>& 
next)
+                                    -> Result<std::shared_ptr<RecordBatch>> {
+      if (IsIterationEnd(next) || this->final_schema_ == nullptr) {
+        return next;
+      }
+      std::vector<std::shared_ptr<Array>> out_cols;
+      std::vector<std::shared_ptr<Field>> out_schema_fields;
+
+      bool changed = false;
+      for (const auto& field : this->final_schema_->fields()) {
+        FieldRef field_ref = FieldRef(field->name());
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> column,
+                              field_ref.GetOneOrNone(*next));
+        if (column) {
+          if (this->cols_to_skip_.count(field->name())) {
+            // Maintain the original input type.
+            out_schema_fields.emplace_back(field->WithType(column->type()));
+            out_cols.emplace_back(std::move(column));

Review Comment:
   So `cols_to_skip_` is just not "Project" this field, rather than not need 
this field?



##########
cpp/src/arrow/dataset/file_parquet.cc:
##########
@@ -637,8 +707,17 @@ Result<RecordBatchGenerator> 
ParquetFileFormat::ScanBatchesAsync(
                           reader->GetRecordBatchGenerator(
                               reader, row_groups, column_projection,
                               ::arrow::internal::GetCpuThreadPool(), 
rows_to_readahead));
+    // We need to skip casting the dictionary columns since the dataset_schema 
doesn't
+    // have the dictionary-encoding information. Parquet reader will return 
them with the
+    // dictionary type, which is what we eventually want.

Review Comment:
   I've forget something here, would `Dict(String)` and `Cast(xxx -> 
LargeString)` matters?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to