claudevdm commented on code in PR #38015:
URL: https://github.com/apache/beam/pull/38015#discussion_r3017326162


##########
sdks/python/apache_beam/io/gcp/bigquery_change_history.py:
##########
@@ -925,6 +1001,59 @@ def _read_stream_batch(self, stream_name: str) -> 
Iterable[Dict[str, Any]]:
         elapsed,
         row_count / elapsed if elapsed > 0 else 0)
 
+  def _read_stream_raw(self, stream_name: str) -> Iterable[Tuple[bytes, 
bytes]]:
+    """Yield raw (schema_bytes, batch_bytes) without decompression.
+
+    Used when emit_raw_batches is enabled to defer decompression and
+    Arrow-to-Python conversion to a downstream DoFn after reshuffling.
+    Schema bytes are included in each tuple so each batch is
+    self-contained and can be decoded independently.
+    """
+    schema_bytes = b''
+    batch_count = 0
+    t0 = time.time()
+    for response in self._storage_client.read_rows(stream_name):
+      if not schema_bytes and response.arrow_schema.serialized_schema:
+        schema_bytes = bytes(response.arrow_schema.serialized_schema)
+      batch_bytes = response.arrow_record_batch.serialized_record_batch
+      if batch_bytes and schema_bytes:
+        yield (schema_bytes, bytes(batch_bytes))
+        batch_count += 1
+    elapsed = time.time() - t0
+    _LOGGER.info('[Read] raw_read: %d batches in %.2fs', batch_count, elapsed)
+
+
+class _DecompressArrowBatchesFn(beam.DoFn):
+  """Decompress and convert raw Arrow batches to timestamped row dicts.
+
+  Receives GBK output: (shard_key, Iterable[(schema_bytes, batch_bytes)])
+  and converts each batch to individual row dicts with event timestamps
+  extracted from the change_timestamp column.
+  """
+  def __init__(self, change_timestamp_column: str = 'change_timestamp') -> 
None:
+    self._change_timestamp_column = change_timestamp_column
+
+  def process(
+      self, element: Tuple[int, Iterable[Tuple[bytes, bytes]]]
+  ) -> Iterable[Dict[str, Any]]:
+    _, batches = element
+    for schema_bytes, batch_bytes in batches:
+      schema = pyarrow.ipc.read_schema(pyarrow.py_buffer(schema_bytes))
+      batch = pyarrow.ipc.read_record_batch(
+          pyarrow.py_buffer(batch_bytes), schema)

Review Comment:
   Not worth it. decoding schema is cheap



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to