This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new cb645a1b27 GH-43802: [GLib] Add `GAFlightRecordBatchWriter` (#43803)
cb645a1b27 is described below
commit cb645a1b27dd66fddb88458c939e2851f9dadf35
Author: Sutou Kouhei <[email protected]>
AuthorDate: Sat Aug 24 06:08:18 2024 +0900
GH-43802: [GLib] Add `GAFlightRecordBatchWriter` (#43803)
### Rationale for this change
This is needed to implement `DoPut`.
### What changes are included in this PR?
We can't add tests for it because it's an abstract class.
I'm not sure `is_owner` is needed like
`GAFlightRecordBatchReader`. `is_owner` may be removed later if we find
that it's needless.
### Are these changes tested?
No.
### Are there any user-facing changes?
Yes.
`GAFlightRecordBatchWriter` is a new public API.
* GitHub Issue: #43802
Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
c_glib/arrow-flight-glib/common.cpp | 198 ++++++++++++++++++++++++++++++++++--
c_glib/arrow-flight-glib/common.h | 32 ++++++
c_glib/arrow-flight-glib/common.hpp | 4 +
3 files changed, 224 insertions(+), 10 deletions(-)
diff --git a/c_glib/arrow-flight-glib/common.cpp
b/c_glib/arrow-flight-glib/common.cpp
index efc544f10c..f7eea08c26 100644
--- a/c_glib/arrow-flight-glib/common.cpp
+++ b/c_glib/arrow-flight-glib/common.cpp
@@ -48,7 +48,11 @@ G_BEGIN_DECLS
*
* #GAFlightStreamChunk is a class for a chunk in stream.
*
- * #GAFlightRecordBatchReader is a class for reading record batches.
+ * #GAFlightRecordBatchReader is an abstract class for reading record
+ * batches with metadata.
+ *
+ * #GAFlightRecordBatchWeriter is an abstract class for
+ * writing record batches with metadata.
*
* Since: 5.0.0
*/
@@ -1172,13 +1176,13 @@ typedef struct GAFlightRecordBatchReaderPrivate_
} GAFlightRecordBatchReaderPrivate;
enum {
- PROP_READER = 1,
- PROP_IS_OWNER,
+ PROP_RECORD_BATCH_READER_READER = 1,
+ PROP_RECORD_BATCH_READER_IS_OWNER,
};
-G_DEFINE_TYPE_WITH_PRIVATE(GAFlightRecordBatchReader,
- gaflight_record_batch_reader,
- G_TYPE_OBJECT)
+G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE(GAFlightRecordBatchReader,
+ gaflight_record_batch_reader,
+ G_TYPE_OBJECT)
#define GAFLIGHT_RECORD_BATCH_READER_GET_PRIVATE(obj)
\
static_cast<GAFlightRecordBatchReaderPrivate *>(
\
@@ -1204,11 +1208,11 @@ gaflight_record_batch_reader_set_property(GObject
*object,
auto priv = GAFLIGHT_RECORD_BATCH_READER_GET_PRIVATE(object);
switch (prop_id) {
- case PROP_READER:
+ case PROP_RECORD_BATCH_READER_READER:
priv->reader =
static_cast<arrow::flight::MetadataRecordBatchReader
*>(g_value_get_pointer(value));
break;
- case PROP_IS_OWNER:
+ case PROP_RECORD_BATCH_READER_IS_OWNER:
priv->is_owner = g_value_get_boolean(value);
break;
default:
@@ -1236,7 +1240,7 @@
gaflight_record_batch_reader_class_init(GAFlightRecordBatchReaderClass *klass)
nullptr,
nullptr,
static_cast<GParamFlags>(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY));
- g_object_class_install_property(gobject_class, PROP_READER, spec);
+ g_object_class_install_property(gobject_class,
PROP_RECORD_BATCH_READER_READER, spec);
spec = g_param_spec_boolean(
"is-owner",
@@ -1244,7 +1248,7 @@
gaflight_record_batch_reader_class_init(GAFlightRecordBatchReaderClass *klass)
nullptr,
TRUE,
static_cast<GParamFlags>(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY));
- g_object_class_install_property(gobject_class, PROP_IS_OWNER, spec);
+ g_object_class_install_property(gobject_class,
PROP_RECORD_BATCH_READER_IS_OWNER, spec);
}
/**
@@ -1296,6 +1300,173 @@
gaflight_record_batch_reader_read_all(GAFlightRecordBatchReader *reader, GError
}
}
+typedef struct GAFlightRecordBatchWriterPrivate_
+{
+ arrow::flight::MetadataRecordBatchWriter *writer;
+ bool is_owner;
+} GAFlightRecordBatchWriterPrivate;
+
+enum {
+ PROP_RECORD_BATCH_WRITER_WRITER = 1,
+ PROP_RECORD_BATCH_WRITER_IS_OWNER,
+};
+
+G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE(GAFlightRecordBatchWriter,
+ gaflight_record_batch_writer,
+ GARROW_TYPE_RECORD_BATCH_WRITER)
+
+#define GAFLIGHT_RECORD_BATCH_WRITER_GET_PRIVATE(object)
\
+ static_cast<GAFlightRecordBatchWriterPrivate *>(
\
+ gaflight_record_batch_writer_get_instance_private(
\
+ GAFLIGHT_RECORD_BATCH_WRITER(object)))
+
+static void
+gaflight_record_batch_writer_finalize(GObject *object)
+{
+ auto priv = GAFLIGHT_RECORD_BATCH_WRITER_GET_PRIVATE(object);
+ if (priv->is_owner) {
+ delete priv->writer;
+ }
+ G_OBJECT_CLASS(gaflight_info_parent_class)->finalize(object);
+}
+
+static void
+gaflight_record_batch_writer_set_property(GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ auto priv = GAFLIGHT_RECORD_BATCH_WRITER_GET_PRIVATE(object);
+
+ switch (prop_id) {
+ case PROP_RECORD_BATCH_WRITER_WRITER:
+ priv->writer =
+ static_cast<arrow::flight::MetadataRecordBatchWriter
*>(g_value_get_pointer(value));
+ break;
+ case PROP_RECORD_BATCH_WRITER_IS_OWNER:
+ priv->is_owner = g_value_get_boolean(value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gaflight_record_batch_writer_init(GAFlightRecordBatchWriter *object)
+{
+}
+
+static void
+gaflight_record_batch_writer_class_init(GAFlightRecordBatchWriterClass *klass)
+{
+ auto gobject_class = G_OBJECT_CLASS(klass);
+
+ gobject_class->finalize = gaflight_record_batch_writer_finalize;
+ gobject_class->set_property = gaflight_record_batch_writer_set_property;
+
+ GParamSpec *spec;
+ spec = g_param_spec_pointer(
+ "writer",
+ nullptr,
+ nullptr,
+ static_cast<GParamFlags>(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY));
+ g_object_class_install_property(gobject_class,
PROP_RECORD_BATCH_WRITER_WRITER, spec);
+
+ spec = g_param_spec_boolean(
+ "is-owner",
+ nullptr,
+ nullptr,
+ TRUE,
+ static_cast<GParamFlags>(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY));
+ g_object_class_install_property(gobject_class,
PROP_RECORD_BATCH_WRITER_IS_OWNER, spec);
+}
+
+/**
+ * gaflight_record_batch_writer_begin:
+ * @writer: A #GAFlightRecordBatchWriter.
+ * @schema: A #GArrowSchema.
+ * @options: (nullable): A #GArrowWriteOptions.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Begins writing data with the given schema. Only used with
+ * `DoExchange`.
+ *
+ * Returns: %TRUE on success, %FALSE on error.
+ *
+ * Since: 18.0.0
+ */
+gboolean
+gaflight_record_batch_writer_begin(GAFlightRecordBatchWriter *writer,
+ GArrowSchema *schema,
+ GArrowWriteOptions *options,
+ GError **error)
+{
+ auto flight_writer = gaflight_record_batch_writer_get_raw(writer);
+ auto arrow_schema = garrow_schema_get_raw(schema);
+ arrow::ipc::IpcWriteOptions arrow_write_options;
+ if (options) {
+ arrow_write_options = *garrow_write_options_get_raw(options);
+ } else {
+ arrow_write_options = arrow::ipc::IpcWriteOptions::Defaults();
+ }
+ return garrow::check(error,
+ flight_writer->Begin(arrow_schema, arrow_write_options),
+ "[flight-record-batch-writer][begin]");
+}
+
+/**
+ * gaflight_record_batch_writer_write_metadata:
+ * @writer: A #GAFlightRecordBatchWriter.
+ * @metadata: A #GArrowBuffer.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Write metadata.
+ *
+ * Returns: %TRUE on success, %FALSE on error.
+ *
+ * Since: 18.0.0
+ */
+gboolean
+gaflight_record_batch_writer_write_metadata(GAFlightRecordBatchWriter *writer,
+ GArrowBuffer *metadata,
+ GError **error)
+{
+ auto flight_writer = gaflight_record_batch_writer_get_raw(writer);
+ auto arrow_metadata = garrow_buffer_get_raw(metadata);
+ return garrow::check(error,
+ flight_writer->WriteMetadata(arrow_metadata),
+ "[flight-record-batch-writer][write-metadata]");
+}
+
+/**
+ * gaflight_record_batch_writer_write:
+ * @writer: A #GAFlightRecordBatchWriter.
+ * @record_batch: A #GArrowRecordBatch.
+ * @metadata: (nullable): A #GArrowBuffer.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Write a record batch with metadata.
+ *
+ * Returns: %TRUE on success, %FALSE on error.
+ *
+ * Since: 18.0.0
+ */
+gboolean
+gaflight_record_batch_writer_write(GAFlightRecordBatchWriter *writer,
+ GArrowRecordBatch *record_batch,
+ GArrowBuffer *metadata,
+ GError **error)
+{
+ auto flight_writer = gaflight_record_batch_writer_get_raw(writer);
+ auto arrow_record_batch = garrow_record_batch_get_raw(record_batch);
+ auto arrow_metadata = garrow_buffer_get_raw(metadata);
+ return garrow::check(
+ error,
+ flight_writer->WriteWithMetadata(*arrow_record_batch, arrow_metadata),
+ "[flight-record-batch-writer][write]");
+}
+
G_END_DECLS
GAFlightCriteria *
@@ -1428,3 +1599,10 @@
gaflight_record_batch_reader_get_raw(GAFlightRecordBatchReader *reader)
auto priv = GAFLIGHT_RECORD_BATCH_READER_GET_PRIVATE(reader);
return priv->reader;
}
+
+arrow::flight::MetadataRecordBatchWriter *
+gaflight_record_batch_writer_get_raw(GAFlightRecordBatchWriter *writer)
+{
+ auto priv = GAFLIGHT_RECORD_BATCH_WRITER_GET_PRIVATE(writer);
+ return priv->writer;
+}
diff --git a/c_glib/arrow-flight-glib/common.h
b/c_glib/arrow-flight-glib/common.h
index b1d89f79c3..91c828caab 100644
--- a/c_glib/arrow-flight-glib/common.h
+++ b/c_glib/arrow-flight-glib/common.h
@@ -232,4 +232,36 @@ GAFLIGHT_AVAILABLE_IN_6_0
GArrowTable *
gaflight_record_batch_reader_read_all(GAFlightRecordBatchReader *reader,
GError **error);
+#define GAFLIGHT_TYPE_RECORD_BATCH_WRITER
(gaflight_record_batch_writer_get_type())
+GAFLIGHT_AVAILABLE_IN_18_0
+G_DECLARE_DERIVABLE_TYPE(GAFlightRecordBatchWriter,
+ gaflight_record_batch_writer,
+ GAFLIGHT,
+ RECORD_BATCH_WRITER,
+ GArrowRecordBatchWriter)
+struct _GAFlightRecordBatchWriterClass
+{
+ GArrowRecordBatchWriterClass parent_class;
+};
+
+GAFLIGHT_AVAILABLE_IN_18_0
+gboolean
+gaflight_record_batch_writer_begin(GAFlightRecordBatchWriter *writer,
+ GArrowSchema *schema,
+ GArrowWriteOptions *options,
+ GError **error);
+
+GAFLIGHT_AVAILABLE_IN_18_0
+gboolean
+gaflight_record_batch_writer_write_metadata(GAFlightRecordBatchWriter *writer,
+ GArrowBuffer *metadata,
+ GError **error);
+
+GAFLIGHT_AVAILABLE_IN_18_0
+gboolean
+gaflight_record_batch_writer_write(GAFlightRecordBatchWriter *writer,
+ GArrowRecordBatch *record_batch,
+ GArrowBuffer *metadata,
+ GError **error);
+
G_END_DECLS
diff --git a/c_glib/arrow-flight-glib/common.hpp
b/c_glib/arrow-flight-glib/common.hpp
index db56fff579..ae5a770339 100644
--- a/c_glib/arrow-flight-glib/common.hpp
+++ b/c_glib/arrow-flight-glib/common.hpp
@@ -79,3 +79,7 @@ gaflight_stream_chunk_get_raw(GAFlightStreamChunk *chunk);
GAFLIGHT_EXTERN
arrow::flight::MetadataRecordBatchReader *
gaflight_record_batch_reader_get_raw(GAFlightRecordBatchReader *reader);
+
+GAFLIGHT_EXTERN
+arrow::flight::MetadataRecordBatchWriter *
+gaflight_record_batch_writer_get_raw(GAFlightRecordBatchWriter *writer);