emkornfield commented on a change in pull request #10108:
URL: https://github.com/apache/arrow/pull/10108#discussion_r619886485
##########
File path: cpp/src/jni/dataset/jni_util.cc
##########
@@ -211,32 +234,140 @@ std::vector<std::string> ToStringVector(JNIEnv* env,
jobjectArray& str_array) {
return vector;
}
-arrow::Result<jbyteArray> ToSchemaByteArray(JNIEnv* env,
- std::shared_ptr<arrow::Schema>
schema) {
- ARROW_ASSIGN_OR_RAISE(
- std::shared_ptr<arrow::Buffer> buffer,
- arrow::ipc::SerializeSchema(*schema, arrow::default_memory_pool()))
+Result<jbyteArray> ToSchemaByteArray(JNIEnv* env, std::shared_ptr<Schema>
schema) {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
+ ipc::SerializeSchema(*schema, default_memory_pool()))
jbyteArray out = env->NewByteArray(buffer->size());
auto src = reinterpret_cast<const jbyte*>(buffer->data());
env->SetByteArrayRegion(out, 0, buffer->size(), src);
return out;
}
-arrow::Result<std::shared_ptr<arrow::Schema>> FromSchemaByteArray(
- JNIEnv* env, jbyteArray schemaBytes) {
- arrow::ipc::DictionaryMemo in_memo;
+Result<std::shared_ptr<Schema>> FromSchemaByteArray(JNIEnv* env, jbyteArray
schemaBytes) {
+ ipc::DictionaryMemo in_memo;
int schemaBytes_len = env->GetArrayLength(schemaBytes);
jbyte* schemaBytes_data = env->GetByteArrayElements(schemaBytes, nullptr);
- auto serialized_schema = std::make_shared<arrow::Buffer>(
+ auto serialized_schema = std::make_shared<Buffer>(
reinterpret_cast<uint8_t*>(schemaBytes_data), schemaBytes_len);
- arrow::io::BufferReader buf_reader(serialized_schema);
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> schema,
- arrow::ipc::ReadSchema(&buf_reader, &in_memo))
+ io::BufferReader buf_reader(serialized_schema);
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Schema> schema,
+ ipc::ReadSchema(&buf_reader, &in_memo))
env->ReleaseByteArrayElements(schemaBytes, schemaBytes_data, JNI_ABORT);
return schema;
}
+Status SetSingleField(std::shared_ptr<ArrayData> array_data,
+ UnsafeNativeManagedRecordBatchProto& batch_proto) {
+ FieldProto* field_adder = batch_proto.add_fields();
+ field_adder->set_length(array_data->length);
+ field_adder->set_nullcount(array_data->null_count);
+
+ std::vector<std::shared_ptr<Buffer>> buffers;
+ for (const auto& buffer : array_data->buffers) {
+ buffers.push_back(buffer);
+ }
Review comment:
seems like this loop and the loop below could be combinded.
##########
File path: cpp/src/jni/dataset/jni_util.cc
##########
@@ -117,26 +118,48 @@ class ReservationListenableMemoryPool::Impl {
std::shared_ptr<ReservationListener> get_listener() { return listener_; }
private:
- arrow::MemoryPool* pool_;
+ MemoryPool* pool_;
std::shared_ptr<ReservationListener> listener_;
int64_t block_size_;
int64_t blocks_reserved_;
int64_t bytes_reserved_;
std::mutex mutex_;
};
+/// \brief Buffer implementation that binds to a
+/// Java buffer reference. Java buffer's release
+/// method will be called once when being destructed.
+class JavaAllocatedBuffer : public Buffer {
+ public:
+ JavaAllocatedBuffer(JNIEnv* env, jobject cleaner_ref, jmethodID
cleaner_method_ref,
+ uint8_t* buffer, int32_t len)
+ : Buffer(buffer, len),
+ env_(env),
+ cleaner_ref_(cleaner_ref),
+ cleaner_method_ref_(cleaner_method_ref) {}
+
+ ~JavaAllocatedBuffer() override {
+ env_->CallVoidMethod(cleaner_ref_, cleaner_method_ref_);
+ env_->DeleteGlobalRef(cleaner_ref_);
+ }
+
+ private:
+ JNIEnv* env_;
+ jobject cleaner_ref_;
+ jmethodID cleaner_method_ref_;
+};
+
ReservationListenableMemoryPool::ReservationListenableMemoryPool(
MemoryPool* pool, std::shared_ptr<ReservationListener> listener, int64_t
block_size) {
impl_.reset(new Impl(pool, listener, block_size));
}
-arrow::Status ReservationListenableMemoryPool::Allocate(int64_t size,
uint8_t** out) {
+Status ReservationListenableMemoryPool::Allocate(int64_t size, uint8_t** out) {
Review comment:
mixing style changes like this with new code makes reviews harder,
please try to avoid large scale changes like this in the future.
##########
File path: cpp/src/jni/dataset/proto/record_batch.proto
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
Review comment:
why are we using protos for this information?
##########
File path:
java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
##########
@@ -126,6 +128,7 @@ public void testParquetBatchSize() throws Exception {
checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum);
AutoCloseables.close(datum);
+ AutoCloseables.close(factory);
Review comment:
combine this line with the previous? close can take multiple parameters
i believe.
##########
File path: cpp/src/jni/dataset/proto/record_batch.proto
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
Review comment:
In particular this for the most part looks like it replicates data
already defined in flatbuffers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]