peridotml commented on code in PR #23030:
URL: https://github.com/apache/beam/pull/23030#discussion_r967882132


##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -83,6 +86,67 @@ def process(self, table, with_filename=False):
         yield row
 
 
+class _RowDictionariesToArrowTable(DoFn):
+  """ A DoFn that consumes python dictionarys and yields a pyarrow table."""
+  def __init__(
+      self,
+      schema,
+      row_group_buffer_size=64 * 1024 * 1024,
+      record_batch_size=1000):
+    self._schema = schema
+    self._row_group_buffer_size = row_group_buffer_size
+    self._buffer = [[] for _ in range(len(schema.names))]
+    self._buffer_size = record_batch_size
+    self._record_batches = []
+    self._record_batches_byte_size = 0
+
+  def process(self, row):
+    if len(self._buffer[0]) >= self._buffer_size:
+      self._flush_buffer()
+
+    if self._record_batches_byte_size >= self._row_group_buffer_size:

Review Comment:
   There also is `[rb|pa_table].get_total_buffer_size`, which would save some 
lines of code and is actually a lot faster. Although, it is pretty negligible.
   
   Test results
   * 900 rows x 10 columns 830 nano sec vs 9.94 micro secs
   * 900 rows x 100 columns   9.8 micro sec vs 100 micro secs
   
   Unfortunately, the counts don't line up perfectly and it already is pretty 
fast. Just wanted to flag this as an option!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to