gemini-code-assist[bot] commented on code in PR #38015:
URL: https://github.com/apache/beam/pull/38015#discussion_r3017042938
##########
sdks/python/apache_beam/io/gcp/bigquery_change_history.py:
##########
@@ -730,16 +738,75 @@ def _ensure_client(self) -> None:
def setup(self) -> None:
self._ensure_client()
+ def _split_all_streams(
+ self, stream_names: Tuple[str, ...],
+ max_split_rounds: int) -> Tuple[str, ...]:
+ """Split each stream at fraction=0.5 for up to max_split_rounds rounds.
+
+ Each round attempts to split every stream in the current list. A
+ successful split replaces the original stream with primary + remainder.
+ A refused split (both fields empty) keeps the original stream intact.
+ Stops when max_split_rounds is reached or a full round produces zero
+ new splits.
+
+ BQ's server-side granularity controls how many splits are possible.
+ Small tables may not split at all; large tables may allow multiple
+ rounds of doubling.
+ """
+ result = list(stream_names)
+ for round_num in range(1, max_split_rounds + 1):
+ new_result = []
+ made_progress = False
+ for name in result:
+ response = self._storage_client.split_read_stream(
+ request=bq_storage.types.SplitReadStreamRequest(
+ name=name, fraction=0.5))
Review Comment:

The `split_read_stream` call is performed synchronously within a nested loop
in `initial_restriction`. For large tables with many initial streams and
multiple split rounds, this could lead to a significant number of RPC calls,
potentially causing timeouts or hitting rate limits. Consider if these calls
can be performed in parallel or if a retry strategy should be explicitly
defined for this specific RPC.
##########
sdks/python/apache_beam/io/gcp/bigquery_change_history.py:
##########
@@ -925,6 +1001,59 @@ def _read_stream_batch(self, stream_name: str) ->
Iterable[Dict[str, Any]]:
elapsed,
row_count / elapsed if elapsed > 0 else 0)
+ def _read_stream_raw(self, stream_name: str) -> Iterable[Tuple[bytes,
bytes]]:
+ """Yield raw (schema_bytes, batch_bytes) without decompression.
+
+ Used when emit_raw_batches is enabled to defer decompression and
+ Arrow-to-Python conversion to a downstream DoFn after reshuffling.
+ Schema bytes are included in each tuple so each batch is
+ self-contained and can be decoded independently.
+ """
+ schema_bytes = b''
+ batch_count = 0
+ t0 = time.time()
+ for response in self._storage_client.read_rows(stream_name):
+ if not schema_bytes and response.arrow_schema.serialized_schema:
+ schema_bytes = bytes(response.arrow_schema.serialized_schema)
+ batch_bytes = response.arrow_record_batch.serialized_record_batch
+ if batch_bytes and schema_bytes:
+ yield (schema_bytes, bytes(batch_bytes))
+ batch_count += 1
+ elapsed = time.time() - t0
+ _LOGGER.info('[Read] raw_read: %d batches in %.2fs', batch_count, elapsed)
+
+
+class _DecompressArrowBatchesFn(beam.DoFn):
+ """Decompress and convert raw Arrow batches to timestamped row dicts.
+
+ Receives GBK output: (shard_key, Iterable[(schema_bytes, batch_bytes)])
+ and converts each batch to individual row dicts with event timestamps
+ extracted from the change_timestamp column.
+ """
+ def __init__(self, change_timestamp_column: str = 'change_timestamp') ->
None:
+ self._change_timestamp_column = change_timestamp_column
+
+ def process(
+ self, element: Tuple[int, Iterable[Tuple[bytes, bytes]]]
+ ) -> Iterable[Dict[str, Any]]:
+ _, batches = element
+ for schema_bytes, batch_bytes in batches:
+ schema = pyarrow.ipc.read_schema(pyarrow.py_buffer(schema_bytes))
+ batch = pyarrow.ipc.read_record_batch(
+ pyarrow.py_buffer(batch_bytes), schema)
Review Comment:

The Arrow schema is re-read from `schema_bytes` for every single batch in
the shard. Since `schema_bytes` is identical for all batches within a stream,
this is redundant CPU work. Consider caching the decoded schema object within
the `DoFn` instance using the `schema_bytes` as a key to improve performance
during decompression.
<details>
<summary>References</summary>
1. Identify performance bottlenecks and optimize for efficiency by avoiding
redundant calculations.
</details>
##########
sdks/python/apache_beam/io/gcp/bigquery_change_history.py:
##########
@@ -838,16 +911,19 @@ def process(
_utc(element.range_end),
table_key)
+ # Release the storage client so the gRPC channel doesn't go stale
+ # between process() calls. _ensure_client() will create a fresh one.
+ self._storage_client = None
Review Comment:

Setting `self._storage_client = None` at the end of every `process` call
will force the recreation of the gRPC channel and client for every bundle (or
restriction). While this might prevent stale channels, creating gRPC channels
is an expensive operation. If the SDF is split into many small restrictions,
this could lead to high connection churn and performance degradation. It might
be better to manage the client lifecycle in `setup` and `teardown`, or use a
more targeted approach to handle stale connections.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]