zenithyr opened a new issue, #36834:
URL: https://github.com/apache/arrow/issues/36834
### Describe the usage question you have. Please include as many useful
details as possible.
I am streaming some time series data to parquet files with the following
aspects:
- I write the data out batch by batch in order to save memory footprint
- The data can optionally partitioned by {hour, minute}.
I have been using the
[parquet::arrow::FileWriter::WriteRecordBatch](https://arrow.apache.org/docs/cpp/parquet.html#writetable),
which can append batch to existing parquet files but doesn't support partition.
So I tried the [Dataset
API](https://arrow.apache.org/docs/cpp/dataset.html#reading-and-writing-partitioned-data).
It seems that
- `ExistingDataBehavior` can only overwrite or delete existing files,
instead of appending
- I don't know how to turn on the Snappy compression for the dataset file
writer
```cpp
arrow::Status append_a_batch(const std::shared_ptr<fs::FileSystem>&
filesystem, const std::shared_ptr<arrow::RecordBatch>& batch) {
// Write using the Dataset API.
std::vector<std::shared_ptr<arrow::RecordBatch>> batches{batch};
auto dataset = std::make_shared<ds::InMemoryDataset>(batch->schema(),
batches);
ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
// Deliberately not partition the data, in order to append to the same
file.
auto partition_schema =
arrow::schema({});//arrow::schema({arrow::field("part", arrow::utf8())});
auto partitioning =
std::make_shared<ds::HivePartitioning>(partition_schema);
auto format = std::make_shared<ds::ParquetFileFormat>();
// Q1. how to enable compression?
ds::FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = format->DefaultWriteOptions();
write_options.filesystem = filesystem;
write_options.base_dir = base_dir;
write_options.partitioning = partitioning;
write_options.basename_template = "part{i}.parquet";
// Q2. how to append to existing files?
write_options.existing_data_behavior =
ds::ExistingDataBehavior::kDeleteMatchingPartitions;
ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options,
scanner));
}
arrow::Status main() {
ARROW_ASSIGN_OR_RAISE(auto filesystem,
fs::FileSystemFromUriOrPath(base_dir));
ARROW_RETURN_NOT_OK(filesystem->CreateDir("/tmp/sample"));
auto& batch = some_code_generating_one_batch();
// writes to /tmp/sample/part0.parquet
append_a_batch(filesystem, batch);
// can't append to /tmp/sample/part0.parquet
append_a_batch(filesystem, batch);
}
```
If that's not possible, I have to wait until I have a full partition in
memory before flushing to the file, which uses more memory.
Please advise. Thanks.
### Component(s)
C++
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]