gemini-code-assist[bot] commented on code in PR #38015:
URL: https://github.com/apache/beam/pull/38015#discussion_r3017042938


##########
sdks/python/apache_beam/io/gcp/bigquery_change_history.py:
##########
@@ -730,16 +738,75 @@ def _ensure_client(self) -> None:
   def setup(self) -> None:
     self._ensure_client()
 
+  def _split_all_streams(
+      self, stream_names: Tuple[str, ...],
+      max_split_rounds: int) -> Tuple[str, ...]:
+    """Split each stream at fraction=0.5 for up to max_split_rounds rounds.
+
+    Each round attempts to split every stream in the current list. A
+    successful split replaces the original stream with primary + remainder.
+    A refused split (both fields empty) keeps the original stream intact.
+    Stops when max_split_rounds is reached or a full round produces zero
+    new splits.
+
+    BQ's server-side granularity controls how many splits are possible.
+    Small tables may not split at all; large tables may allow multiple
+    rounds of doubling.
+    """
+    result = list(stream_names)
+    for round_num in range(1, max_split_rounds + 1):
+      new_result = []
+      made_progress = False
+      for name in result:
+        response = self._storage_client.split_read_stream(
+            request=bq_storage.types.SplitReadStreamRequest(
+                name=name, fraction=0.5))

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `split_read_stream` call is performed synchronously within a nested loop 
in `initial_restriction`. For large tables with many initial streams and 
multiple split rounds, this could lead to a significant number of RPC calls, 
potentially causing timeouts or hitting rate limits. Consider if these calls 
can be performed in parallel or if a retry strategy should be explicitly 
defined for this specific RPC.



##########
sdks/python/apache_beam/io/gcp/bigquery_change_history.py:
##########
@@ -925,6 +1001,59 @@ def _read_stream_batch(self, stream_name: str) -> 
Iterable[Dict[str, Any]]:
         elapsed,
         row_count / elapsed if elapsed > 0 else 0)
 
+  def _read_stream_raw(self, stream_name: str) -> Iterable[Tuple[bytes, 
bytes]]:
+    """Yield raw (schema_bytes, batch_bytes) without decompression.
+
+    Used when emit_raw_batches is enabled to defer decompression and
+    Arrow-to-Python conversion to a downstream DoFn after reshuffling.
+    Schema bytes are included in each tuple so each batch is
+    self-contained and can be decoded independently.
+    """
+    schema_bytes = b''
+    batch_count = 0
+    t0 = time.time()
+    for response in self._storage_client.read_rows(stream_name):
+      if not schema_bytes and response.arrow_schema.serialized_schema:
+        schema_bytes = bytes(response.arrow_schema.serialized_schema)
+      batch_bytes = response.arrow_record_batch.serialized_record_batch
+      if batch_bytes and schema_bytes:
+        yield (schema_bytes, bytes(batch_bytes))
+        batch_count += 1
+    elapsed = time.time() - t0
+    _LOGGER.info('[Read] raw_read: %d batches in %.2fs', batch_count, elapsed)
+
+
+class _DecompressArrowBatchesFn(beam.DoFn):
+  """Decompress and convert raw Arrow batches to timestamped row dicts.
+
+  Receives GBK output: (shard_key, Iterable[(schema_bytes, batch_bytes)])
+  and converts each batch to individual row dicts with event timestamps
+  extracted from the change_timestamp column.
+  """
+  def __init__(self, change_timestamp_column: str = 'change_timestamp') -> 
None:
+    self._change_timestamp_column = change_timestamp_column
+
+  def process(
+      self, element: Tuple[int, Iterable[Tuple[bytes, bytes]]]
+  ) -> Iterable[Dict[str, Any]]:
+    _, batches = element
+    for schema_bytes, batch_bytes in batches:
+      schema = pyarrow.ipc.read_schema(pyarrow.py_buffer(schema_bytes))
+      batch = pyarrow.ipc.read_record_batch(
+          pyarrow.py_buffer(batch_bytes), schema)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The Arrow schema is re-read from `schema_bytes` for every single batch in 
the shard. Since `schema_bytes` is identical for all batches within a stream, 
this is redundant CPU work. Consider caching the decoded schema object within 
the `DoFn` instance using the `schema_bytes` as a key to improve performance 
during decompression.
   
   <details>
   <summary>References</summary>
   
   1. Identify performance bottlenecks and optimize for efficiency by avoiding 
redundant calculations.
   </details>



##########
sdks/python/apache_beam/io/gcp/bigquery_change_history.py:
##########
@@ -838,16 +911,19 @@ def process(
         _utc(element.range_end),
         table_key)
 
+    # Release the storage client so the gRPC channel doesn't go stale
+    # between process() calls. _ensure_client() will create a fresh one.
+    self._storage_client = None

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Setting `self._storage_client = None` at the end of every `process` call 
will force the recreation of the gRPC channel and client for every bundle (or 
restriction). While this might prevent stale channels, creating gRPC channels 
is an expensive operation. If the SDF is split into many small restrictions, 
this could lead to high connection churn and performance degradation. It might 
be better to manage the client lifecycle in `setup` and `teardown`, or use a 
more targeted approach to handle stale connections.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to