ahmedabu98 commented on issue #34212:
URL: https://github.com/apache/beam/issues/34212#issuecomment-2706749991
<details>
<summary>Iceberg source test</summary>
```python
def test_streaming_write_read_cdc_pipeline_using_bqms(self):
runner = self.test_pipeline.get_option('runner')
if not runner or "dataflowrunner" not in runner.lower():
self.skipTest(
"CDC streaming source requires"
"`beam:requirement:pardo:on_window_expiration:v1`, "
"which is currently only supported by the Dataflow runner")
bigquery_client = BigQueryWrapper()
dataset_id = "py_managed_iceberg_bqms_test_" + str(int(time.time()))
project = self.test_pipeline.get_option('project')
bigquery_client.get_or_create_dataset(project, dataset_id)
_LOGGER.info(
"Created dataset %s in project %s", dataset_id, project)
catalog_props = {
"warehouse": self.WAREHOUSE,
"gcp_project": project,
"gcp_location": "us-central1",
"catalog-impl":
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog",
"io-impl": "org.apache.iceberg.gcp.gcs.GCSFileIO"}
first_table = dataset_id + ".first"
rows = [self._create_row(i) for i in range(3)]
expected_dicts = [row.as_dict() for row in rows]
# Reduce test time by forcing direct runner on pipelines that don't need
Dataflow
args_with_directrunner = PipelineOptions(self.args).get_all_options()
args_with_directrunner['runner'] = 'DirectRunner'
# first, prepare an initial table (uses DirectRunner)
with beam.Pipeline(options=PipelineOptions(**args_with_directrunner)) as
write_pipeline:
first_write_config = {
"table": first_table,
"catalog_properties": catalog_props
}
_ = (
write_pipeline
| beam.Create(rows)
| beam.managed.Write(beam.managed.ICEBERG,
config=first_write_config))
# the intended test: stream cdc read from the first table, validate, and
stream records to a second table
# (uses DataflowRunner)
second_table = dataset_id + ".second"
with beam.Pipeline(argv=self.args) as read_pipeline:
first_read_config = {
"table": first_table,
"catalog_properties": catalog_props,
"streaming": True,
"to_timestamp": int((time.time() * 1000))
}
second_write_config = {
"table": second_table,
"catalog_properties": catalog_props,
"triggering_frequency_seconds": 5
}
output_cdc_rows = (
read_pipeline
| beam.managed.Read(beam.managed.ICEBERG_CDC,
config=first_read_config))
output_cdc_dicts = output_cdc_rows | beam.Map(lambda row:
{"operation": row.operation, "record": row.record._asdict()})
_ = (output_cdc_rows
| "Extract records" >> beam.Map(lambda row: row.record)
.with_output_types(
RowTypeConstraint.from_fields([
("int_", int),
("str_", str),
("bytes_", bytes),
("bool_", bool),
("float_", float)
]))
| beam.managed.Write(beam.managed.ICEBERG,
config=second_write_config))
expected_cdc_dicts = [{"record": record, "operation": "append"} for
record in expected_dicts]
assert_that(output_cdc_dicts, equal_to(expected_cdc_dicts))
# batch read from the second table and validate records (uses
DirectRunner)
with beam.Pipeline(options=PipelineOptions(**args_with_directrunner)) as
read_pipeline:
second_read_config = {
"table": second_table,
"catalog_properties": catalog_props
}
output_dicts = (
read_pipeline
| beam.managed.Read(beam.managed.ICEBERG,
config=second_read_config)
| beam.Map(lambda row: row._asdict()))
assert_that(output_dicts, equal_to(expected_dicts))
bigquery_client._delete_dataset(project, dataset_id)
```
</details>
The test passes when adding the `trigger.py` changes from #14060.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]