TheNeuralBit commented on code in PR #23030:
URL: https://github.com/apache/beam/pull/23030#discussion_r989464309


##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -83,6 +86,67 @@ def process(self, table, with_filename=False):
         yield row
 
 
+class _RowDictionariesToArrowTable(DoFn):
+  """ A DoFn that consumes python dictionarys and yields a pyarrow table."""
+  def __init__(
+      self,
+      schema,
+      row_group_buffer_size=64 * 1024 * 1024,
+      record_batch_size=1000):
+    self._schema = schema
+    self._row_group_buffer_size = row_group_buffer_size
+    self._buffer = [[] for _ in range(len(schema.names))]
+    self._buffer_size = record_batch_size
+    self._record_batches = []
+    self._record_batches_byte_size = 0
+
+  def process(self, row):
+    if len(self._buffer[0]) >= self._buffer_size:
+      self._flush_buffer()
+
+    if self._record_batches_byte_size >= self._row_group_buffer_size:

Review Comment:
   Hm, the counts don't line up perfectly? How different are they? If they are 
different I would suspect the number provided by pyarrow is the correct one.
   
   I think this is worth digging into further, but I'd recommend filing an 
issue and doing it in a separate PR.  That way this one is mostly a no-op 
moving code around.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to