TheNeuralBit commented on code in PR #23030:
URL: https://github.com/apache/beam/pull/23030#discussion_r989461981
##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -83,6 +86,67 @@ def process(self, table, with_filename=False):
yield row
+class _RowDictionariesToArrowTable(DoFn):
+ """ A DoFn that consumes python dictionarys and yields a pyarrow table."""
+ def __init__(
+ self,
+ schema,
+ row_group_buffer_size=64 * 1024 * 1024,
+ record_batch_size=1000):
+ self._schema = schema
+ self._row_group_buffer_size = row_group_buffer_size
+ self._buffer = [[] for _ in range(len(schema.names))]
+ self._buffer_size = record_batch_size
+ self._record_batches = []
+ self._record_batches_byte_size = 0
+
+ def process(self, row):
+ if len(self._buffer[0]) >= self._buffer_size:
+ self._flush_buffer()
+
+ if self._record_batches_byte_size >= self._row_group_buffer_size:
+ table = self._create_table()
+ yield table
+
+ # reorder the data in columnar format.
+ for i, n in enumerate(self._schema.names):
+ self._buffer[i].append(row[n])
+
+ def finish_bundle(self):
+ if len(self._buffer[0]) > 0:
+ self._flush_buffer()
+ if self._record_batches_byte_size > 0:
+ table = self._create_table()
+ yield window.GlobalWindows.windowed_value_at_end_of_window(table)
+
+ def display_data(self):
+ res = super().display_data()
+ res['row_group_buffer_size'] = str(self._row_group_buffer_size)
+ res['buffer_size'] = str(self._buffer_size)
+
+ return res
+
+ def _create_table(self):
+ table = pa.Table.from_batches(self._record_batches, schema=self._schema)
+ self._record_batches = []
+ self._record_batches_byte_size = 0
+ return table
+
+ def _flush_buffer(self):
+ arrays = [[] for _ in range(len(self._schema.names))]
+ for x, y in enumerate(self._buffer):
+ arrays[x] = pa.array(y, type=self._schema.types[x])
+ self._buffer[x] = []
+ rb = pa.RecordBatch.from_arrays(arrays, schema=self._schema)
+ self._record_batches.append(rb)
+ size = 0
+ for x in arrays:
Review Comment:
Note we test with all supported major versions of pyarrow in CI, so you
don't need to worry about unintentionally breaking compatibility with an old
version.
Also, if support for pyarrow 0.x or 1.x is problematic we could go ahead and
drop them. They are 2 years old at this point.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]