gemini-code-assist[bot] commented on code in PR #37647: URL: https://github.com/apache/beam/pull/37647#discussion_r2993326802
########## sdks/python/apache_beam/examples/inference/table_row_inference.py: ########## @@ -0,0 +1,369 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A pipeline that uses RunInference to perform inference on table rows. + +This pipeline demonstrates ML Pipelines #18: handling continuous new table +rows with RunInference using table input models. It reads structured data +(table rows) from a streaming source, performs inference while preserving +the table schema, and writes results to a table output. + +The pipeline supports both streaming and batch modes: +- Streaming: Reads from Pub/Sub, applies windowing, writes via streaming inserts +- Batch: Reads from file, processes all data, writes via file loads + +Example usage for streaming: + python table_row_inference.py \ + --mode=streaming \ + --input_subscription=projects/PROJECT/subscriptions/SUBSCRIPTION \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=REGION \ + --temp_location=gs://BUCKET/temp + +Example usage for batch: + python table_row_inference.py \ + --mode=batch \ + --input_file=gs://BUCKET/input.jsonl \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 + + # Batch with file output + python table_row_inference.py \ + --mode=batch \ + --input_file=data.jsonl \ + --output_file=predictions.jsonl \ + --model_path=model.pkl \ + --feature_columns=feature1,feature2,feature3 +""" + +import argparse +import hashlib +import json +import logging +from collections.abc import Iterable +from typing import Any +from typing import Optional + +import apache_beam as beam +import numpy as np +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.sklearn_inference import ModelFileType +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.runner import PipelineResult + + +class TableRowModelHandler(SklearnModelHandlerNumpy): + """ModelHandler that processes table rows (beam.Row objects) for inference. + + This handler extends SklearnModelHandlerNumpy to work with structured + table data represented as beam.Row objects. It extracts specified feature + columns from the row and converts them to numpy arrays for model input. + + Attributes: + feature_columns: List of column names to extract as features from input rows Review Comment:  The indentation for the `Attributes` section in the docstring is incorrect. According to PEP 257 (Docstring Conventions), the descriptions for attributes should be indented to align with the start of their respective descriptions, typically one level deeper than the `Attributes` keyword itself. ```python Attributes: feature_columns: List of column names to extract as features from input rows ``` ########## sdks/python/apache_beam/examples/inference/table_row_batch_example.py: ########## @@ -0,0 +1,194 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Complete batch inference example with sample data generation. + +This script demonstrates how to use the batch table row inference pipeline +with automatically generated sample data and model. + +Usage: + # Run complete local example + python table_row_batch_example.py + + # Run with custom parameters + python table_row_batch_example.py --num_rows=1000 --num_features=5 +""" + +import argparse +import json +import logging +import os +import pickle +import sys +import tempfile + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def create_sample_data_and_model(tmpdir, num_rows=100, num_features=3): + """Create sample model and data for testing. + + Args: + tmpdir: Temporary directory path + num_rows: Number of data rows to generate + num_features: Number of features per row + + Returns: + Tuple of (model_path, data_path, feature_columns) + """ Review Comment:  The indentation for the `Args` and `Returns` sections in the docstring is incorrect. According to PEP 257 (Docstring Conventions), the descriptions for arguments and return values should be indented to align with the start of their respective descriptions, typically one level deeper than the `Args` or `Returns` keyword itself. ```suggestion Args: tmpdir: Temporary directory path num_rows: Number of data rows to generate num_features: Number of features per row Returns: Tuple of (model_path, data_path, feature_columns) ``` ########## sdks/python/apache_beam/examples/inference/README.md: ########## @@ -968,4 +968,70 @@ and produce the following result in your output file location: An emperor penguin is an adorable creature that lives in Antarctica. ``` +--- +## Table row inference + +[`table_row_inference.py`](./table_row_inference.py) contains an implementation for a RunInference pipeline that processes structured table rows from a file or Pub/Sub, runs ML inference while preserving the table schema, and writes results to BigQuery. It supports both batch (file input) and streaming (Pub/Sub) modes. + +### Prerequisites for table row inference + +Install dependencies (or use `apache_beam/ml/inference/table_row_inference_requirements.txt` from the `sdks/python` directory): + +```sh +pip install apache-beam[gcp] scikit-learn google-cloud-pubsub +``` + +For streaming mode you need a Pub/Sub topic and subscription, a BigQuery dataset, and a GCS bucket for model and temp files. + +### Model and data for table row inference + +1. Create a scikit-learn model and sample data using the provided utilities: + +```sh +python -m apache_beam.examples.inference.table_row_inference_utils --action=create_model --output_path=model.pkl --num_features=3 +python -m apache_beam.examples.inference.table_row_inference_utils --action=generate_data --output_path=input_data.jsonl --num_rows=1000 --num_features=3 +``` + +2. Input data should be JSONL with an `id` field and feature columns, for example: + +```json +{"id": "row_1", "feature1": 1.5, "feature2": 2.3, "feature3": 3.7} +``` + +### Running `table_row_inference.py` (batch) + +To run the table row inference pipeline in batch mode locally: + +```sh +python -m apache_beam.examples.inference.table_row_inference \ + --mode=batch \ + --input_file=input_data.jsonl \ + --output_table=PROJECT:DATASET.predictions \ + --model_path=model.pkl \ + --feature_columns=feature1,feature2,feature3 \ + --runner=DirectRunner +``` + +### Running `table_row_inference.py` (streaming) + +For streaming mode, use a Pub/Sub subscription and DataflowRunner. Set up a topic and subscription first, then run: + +```sh +python -m apache_beam.examples.inference.table_row_inference \ + --mode=streaming \ + --input_subscription=projects/PROJECT/subscriptions/SUBSCRIPTION \ + --output_table=PROJECT:DATASET.predictions \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=us-central1 \ + --temp_location=gs://BUCKET/temp \ + --staging_location=gs://BUCKET/staging +``` + +See the script for full pipeline options (window size, trigger interval, worker settings, etc.). + +Output is written to the BigQuery table with columns such as `row_key`, `prediction`, and the original input feature columns. + --- Review Comment:  It is good practice to ensure that all text files, including Markdown files, end with a newline character. This helps with version control systems and tools that expect files to terminate with a newline. ########## sdks/python/apache_beam/examples/inference/table_row_inference.py: ########## @@ -0,0 +1,369 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A pipeline that uses RunInference to perform inference on table rows. + +This pipeline demonstrates ML Pipelines #18: handling continuous new table +rows with RunInference using table input models. It reads structured data +(table rows) from a streaming source, performs inference while preserving +the table schema, and writes results to a table output. + +The pipeline supports both streaming and batch modes: +- Streaming: Reads from Pub/Sub, applies windowing, writes via streaming inserts +- Batch: Reads from file, processes all data, writes via file loads + +Example usage for streaming: + python table_row_inference.py \ + --mode=streaming \ + --input_subscription=projects/PROJECT/subscriptions/SUBSCRIPTION \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=REGION \ + --temp_location=gs://BUCKET/temp + +Example usage for batch: + python table_row_inference.py \ + --mode=batch \ + --input_file=gs://BUCKET/input.jsonl \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 + + # Batch with file output + python table_row_inference.py \ + --mode=batch \ + --input_file=data.jsonl \ + --output_file=predictions.jsonl \ + --model_path=model.pkl \ + --feature_columns=feature1,feature2,feature3 +""" + +import argparse +import hashlib +import json +import logging +from collections.abc import Iterable +from typing import Any +from typing import Optional + +import apache_beam as beam +import numpy as np +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.sklearn_inference import ModelFileType +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.runner import PipelineResult + + +class TableRowModelHandler(SklearnModelHandlerNumpy): + """ModelHandler that processes table rows (beam.Row objects) for inference. + + This handler extends SklearnModelHandlerNumpy to work with structured + table data represented as beam.Row objects. It extracts specified feature + columns from the row and converts them to numpy arrays for model input. + + Attributes: + feature_columns: List of column names to extract as features from input rows + """ + def __init__( + self, + model_uri: str, + feature_columns: list[str], + model_file_type: ModelFileType = ModelFileType.PICKLE): + """Initialize the TableRowModelHandler. + + Args: + model_uri: Path to the saved model file (local or GCS) + feature_columns: List of column names to use as model features + model_file_type: Type of model file (PICKLE or JOBLIB) + """ + super().__init__(model_uri=model_uri, model_file_type=model_file_type) + self.feature_columns = feature_columns + + def run_inference( + self, + batch: list[beam.Row], + model: Any, + inference_args: Optional[dict[str, Any]] = None + ) -> Iterable[PredictionResult]: + """Run inference on a batch of beam.Row objects. + + Args: + batch: List of beam.Row objects containing input features + model: Loaded sklearn model + inference_args: Optional additional arguments for inference + + Yields: + PredictionResult containing the original row and prediction + """ + features_array = [] + for row in batch: + row_dict = row._asdict() + features = [row_dict.get(col, 0.0) for col in self.feature_columns] + features_array.append(features) + + features_array = np.array(features_array, dtype=np.float32) + predictions = model.predict(features_array) + + for row, prediction in zip(batch, predictions): + yield PredictionResult( + example=row, inference=float(prediction), model_id=self._model_uri) + + +class FormatTableOutput(beam.DoFn): + """DoFn that formats inference results into table output schema. + + Takes PredictionResult objects from KeyedModelHandler and formats them + into dictionaries suitable for writing to BigQuery or other table outputs. + """ + def __init__(self, feature_columns: list[str]): + self.feature_columns = feature_columns + + def process( + self, element: tuple[str, PredictionResult]) -> Iterable[dict[str, Any]]: + """Process a keyed inference result into table output format. + + Args: + element: Tuple of (row_key, PredictionResult) + + Yields: + Dictionary with all input fields plus prediction and metadata Review Comment:  The indentation for the `Args` and `Yields` sections in the docstring is incorrect. According to PEP 257 (Docstring Conventions), the descriptions for arguments and yielded values should be indented to align with the start of their respective descriptions, typically one level deeper than the `Args` or `Yields` keyword itself. ```python Args: element: Tuple of (row_key, PredictionResult) Yields: Dictionary with all input fields plus prediction and metadata ``` ########## sdks/python/apache_beam/examples/inference/table_row_inference.py: ########## @@ -0,0 +1,369 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A pipeline that uses RunInference to perform inference on table rows. + +This pipeline demonstrates ML Pipelines #18: handling continuous new table +rows with RunInference using table input models. It reads structured data +(table rows) from a streaming source, performs inference while preserving +the table schema, and writes results to a table output. + +The pipeline supports both streaming and batch modes: +- Streaming: Reads from Pub/Sub, applies windowing, writes via streaming inserts +- Batch: Reads from file, processes all data, writes via file loads + +Example usage for streaming: + python table_row_inference.py \ + --mode=streaming \ + --input_subscription=projects/PROJECT/subscriptions/SUBSCRIPTION \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=REGION \ + --temp_location=gs://BUCKET/temp + +Example usage for batch: + python table_row_inference.py \ + --mode=batch \ + --input_file=gs://BUCKET/input.jsonl \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 + + # Batch with file output + python table_row_inference.py \ + --mode=batch \ + --input_file=data.jsonl \ + --output_file=predictions.jsonl \ + --model_path=model.pkl \ + --feature_columns=feature1,feature2,feature3 +""" + +import argparse +import hashlib +import json +import logging +from collections.abc import Iterable +from typing import Any +from typing import Optional + +import apache_beam as beam +import numpy as np +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.sklearn_inference import ModelFileType +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.runner import PipelineResult + + +class TableRowModelHandler(SklearnModelHandlerNumpy): + """ModelHandler that processes table rows (beam.Row objects) for inference. + + This handler extends SklearnModelHandlerNumpy to work with structured + table data represented as beam.Row objects. It extracts specified feature + columns from the row and converts them to numpy arrays for model input. + + Attributes: + feature_columns: List of column names to extract as features from input rows + """ + def __init__( + self, + model_uri: str, + feature_columns: list[str], + model_file_type: ModelFileType = ModelFileType.PICKLE): + """Initialize the TableRowModelHandler. + + Args: + model_uri: Path to the saved model file (local or GCS) + feature_columns: List of column names to use as model features + model_file_type: Type of model file (PICKLE or JOBLIB) + """ + super().__init__(model_uri=model_uri, model_file_type=model_file_type) + self.feature_columns = feature_columns + + def run_inference( + self, + batch: list[beam.Row], + model: Any, + inference_args: Optional[dict[str, Any]] = None + ) -> Iterable[PredictionResult]: + """Run inference on a batch of beam.Row objects. + + Args: + batch: List of beam.Row objects containing input features + model: Loaded sklearn model + inference_args: Optional additional arguments for inference + + Yields: + PredictionResult containing the original row and prediction + """ + features_array = [] + for row in batch: + row_dict = row._asdict() + features = [row_dict.get(col, 0.0) for col in self.feature_columns] + features_array.append(features) + + features_array = np.array(features_array, dtype=np.float32) + predictions = model.predict(features_array) + + for row, prediction in zip(batch, predictions): + yield PredictionResult( + example=row, inference=float(prediction), model_id=self._model_uri) + + +class FormatTableOutput(beam.DoFn): + """DoFn that formats inference results into table output schema. + + Takes PredictionResult objects from KeyedModelHandler and formats them + into dictionaries suitable for writing to BigQuery or other table outputs. + """ + def __init__(self, feature_columns: list[str]): + self.feature_columns = feature_columns + + def process( + self, element: tuple[str, PredictionResult]) -> Iterable[dict[str, Any]]: + """Process a keyed inference result into table output format. + + Args: + element: Tuple of (row_key, PredictionResult) + + Yields: + Dictionary with all input fields plus prediction and metadata + """ + key, prediction = element + row = prediction.example + row_dict = row._asdict() + output = {'row_key': key, 'prediction': prediction.inference} + + if prediction.model_id: + output['model_id'] = prediction.model_id + + for field_name in self.feature_columns: + output[f'input_{field_name}'] = row_dict.get(field_name, 0.0) + + yield output + + +def parse_json_to_table_row( + message: bytes, + schema_fields: Optional[list[str]] = None) -> tuple[str, beam.Row]: + """Parse JSON message to (key, beam.Row) format for KeyedModelHandler. + + Args: + message: JSON-encoded bytes + schema_fields: Optional list of expected field names + + Returns: + Tuple of (unique_key, beam.Row with parsed data) + """ + data = json.loads(message.decode('utf-8')) + + row_key = data.get('id', hashlib.sha256(message).hexdigest()) + + row_fields = {} + for key, value in data.items(): + if key != 'id' and (schema_fields is None or key in schema_fields): + if isinstance(value, (int, float)): + row_fields[key] = float(value) + else: + row_fields[key] = value + + table_row = beam.Row(**row_fields) + return row_key, table_row + + +def build_output_schema(feature_columns: list[str]) -> str: + """Build BigQuery schema string for output table. + + Args: + feature_columns: List of feature column names + + Returns: + BigQuery schema string + """ + schema_parts = ['row_key:STRING', 'prediction:FLOAT', 'model_id:STRING'] + + for col in feature_columns: + schema_parts.append(f'input_{col}:FLOAT') + + return ','.join(schema_parts) + + +def parse_known_args(argv): + """Parse command-line arguments for the pipeline.""" + parser = argparse.ArgumentParser() + + parser.add_argument( + '--mode', + default='batch', + choices=['streaming', 'batch'], + help='Pipeline mode: streaming or batch') + parser.add_argument( + '--input_subscription', + help='Pub/Sub subscription for streaming mode ' + '(format: projects/PROJECT/subscriptions/SUBSCRIPTION)') + parser.add_argument( + '--input_file', + help='Input file path for batch mode (e.g., gs://bucket/input.jsonl)') + parser.add_argument( + '--output_table', + help='BigQuery output table (format: PROJECT:DATASET.TABLE)') + parser.add_argument( + '--output_file', + help='Output file path (JSONL format) for batch mode. ' + 'Alternative to or in addition to output_table.') + parser.add_argument('--model_path', help='Path to saved model file') + parser.add_argument( + '--feature_columns', + required=True, + help='Comma-separated list of feature column names') + parser.add_argument( + '--window_size_sec', + type=int, + default=60, + help='Window size in seconds for streaming mode (default: 60)') + parser.add_argument( + '--trigger_interval_sec', + type=int, + default=30, + help='Trigger interval in seconds for streaming mode (default: 30)') + parser.add_argument( + '--input_expand_factor', + type=int, + default=1, + help='In batch mode: repeat each input line this many times to scale up ' + 'volume (e.g. 100k lines × 100 = 10M rows). Default 1 = no expansion.') + return parser.parse_known_args(argv) + + +def run( + argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult: + """Main pipeline execution function. + + Args: + argv: Command-line arguments + save_main_session: Whether to save main session for workers + test_pipeline: Optional test pipeline (for testing) + + Returns: + PipelineResult from pipeline execution Review Comment:  The indentation for the `Args` and `Returns` sections in the docstring is incorrect. According to PEP 257 (Docstring Conventions), the descriptions for arguments and return values should be indented to align with the start of their respective descriptions, typically one level deeper than the `Args` or `Returns` keyword itself. ```python Args: argv: Command-line arguments save_main_session: Whether to save main session for workers test_pipeline: Optional test pipeline (for testing) Returns: PipelineResult from pipeline execution ``` ########## sdks/python/apache_beam/examples/inference/table_row_inference.py: ########## @@ -0,0 +1,369 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A pipeline that uses RunInference to perform inference on table rows. + +This pipeline demonstrates ML Pipelines #18: handling continuous new table +rows with RunInference using table input models. It reads structured data +(table rows) from a streaming source, performs inference while preserving +the table schema, and writes results to a table output. + +The pipeline supports both streaming and batch modes: +- Streaming: Reads from Pub/Sub, applies windowing, writes via streaming inserts +- Batch: Reads from file, processes all data, writes via file loads + +Example usage for streaming: + python table_row_inference.py \ + --mode=streaming \ + --input_subscription=projects/PROJECT/subscriptions/SUBSCRIPTION \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=REGION \ + --temp_location=gs://BUCKET/temp + +Example usage for batch: + python table_row_inference.py \ + --mode=batch \ + --input_file=gs://BUCKET/input.jsonl \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 + + # Batch with file output + python table_row_inference.py \ + --mode=batch \ + --input_file=data.jsonl \ + --output_file=predictions.jsonl \ + --model_path=model.pkl \ + --feature_columns=feature1,feature2,feature3 +""" + +import argparse +import hashlib +import json +import logging +from collections.abc import Iterable +from typing import Any +from typing import Optional + +import apache_beam as beam +import numpy as np +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.sklearn_inference import ModelFileType +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.runner import PipelineResult + + +class TableRowModelHandler(SklearnModelHandlerNumpy): + """ModelHandler that processes table rows (beam.Row objects) for inference. + + This handler extends SklearnModelHandlerNumpy to work with structured + table data represented as beam.Row objects. It extracts specified feature + columns from the row and converts them to numpy arrays for model input. + + Attributes: + feature_columns: List of column names to extract as features from input rows + """ + def __init__( + self, + model_uri: str, + feature_columns: list[str], + model_file_type: ModelFileType = ModelFileType.PICKLE): + """Initialize the TableRowModelHandler. + + Args: + model_uri: Path to the saved model file (local or GCS) + feature_columns: List of column names to use as model features + model_file_type: Type of model file (PICKLE or JOBLIB) + """ + super().__init__(model_uri=model_uri, model_file_type=model_file_type) + self.feature_columns = feature_columns + + def run_inference( + self, + batch: list[beam.Row], + model: Any, + inference_args: Optional[dict[str, Any]] = None + ) -> Iterable[PredictionResult]: + """Run inference on a batch of beam.Row objects. + + Args: + batch: List of beam.Row objects containing input features + model: Loaded sklearn model + inference_args: Optional additional arguments for inference + + Yields: Review Comment:  The indentation for the `Args` and `Yields` sections in the docstring is incorrect. According to PEP 257 (Docstring Conventions), the descriptions for arguments and yielded values should be indented to align with the start of their respective descriptions, typically one level deeper than the `Args` or `Yields` keyword itself. ```suggestion Args: batch: List of beam.Row objects containing input features model: Loaded sklearn model inference_args: Optional additional arguments for inference Yields: PredictionResult containing the original row and prediction ``` ########## sdks/python/apache_beam/examples/inference/table_row_inference.py: ########## @@ -0,0 +1,369 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A pipeline that uses RunInference to perform inference on table rows. + +This pipeline demonstrates ML Pipelines #18: handling continuous new table +rows with RunInference using table input models. It reads structured data +(table rows) from a streaming source, performs inference while preserving +the table schema, and writes results to a table output. + +The pipeline supports both streaming and batch modes: +- Streaming: Reads from Pub/Sub, applies windowing, writes via streaming inserts +- Batch: Reads from file, processes all data, writes via file loads + +Example usage for streaming: + python table_row_inference.py \ + --mode=streaming \ + --input_subscription=projects/PROJECT/subscriptions/SUBSCRIPTION \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=REGION \ + --temp_location=gs://BUCKET/temp + +Example usage for batch: + python table_row_inference.py \ + --mode=batch \ + --input_file=gs://BUCKET/input.jsonl \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 + + # Batch with file output + python table_row_inference.py \ + --mode=batch \ + --input_file=data.jsonl \ + --output_file=predictions.jsonl \ + --model_path=model.pkl \ + --feature_columns=feature1,feature2,feature3 +""" + +import argparse +import hashlib +import json +import logging +from collections.abc import Iterable +from typing import Any +from typing import Optional + +import apache_beam as beam +import numpy as np +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.sklearn_inference import ModelFileType +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.runner import PipelineResult + + +class TableRowModelHandler(SklearnModelHandlerNumpy): + """ModelHandler that processes table rows (beam.Row objects) for inference. + + This handler extends SklearnModelHandlerNumpy to work with structured + table data represented as beam.Row objects. It extracts specified feature + columns from the row and converts them to numpy arrays for model input. + + Attributes: + feature_columns: List of column names to extract as features from input rows + """ + def __init__( + self, + model_uri: str, + feature_columns: list[str], + model_file_type: ModelFileType = ModelFileType.PICKLE): + """Initialize the TableRowModelHandler. + + Args: + model_uri: Path to the saved model file (local or GCS) + feature_columns: List of column names to use as model features + model_file_type: Type of model file (PICKLE or JOBLIB) + """ + super().__init__(model_uri=model_uri, model_file_type=model_file_type) + self.feature_columns = feature_columns + + def run_inference( + self, + batch: list[beam.Row], + model: Any, + inference_args: Optional[dict[str, Any]] = None + ) -> Iterable[PredictionResult]: + """Run inference on a batch of beam.Row objects. + + Args: + batch: List of beam.Row objects containing input features + model: Loaded sklearn model + inference_args: Optional additional arguments for inference + + Yields: + PredictionResult containing the original row and prediction + """ + features_array = [] + for row in batch: + row_dict = row._asdict() + features = [row_dict.get(col, 0.0) for col in self.feature_columns] + features_array.append(features) + + features_array = np.array(features_array, dtype=np.float32) + predictions = model.predict(features_array) + + for row, prediction in zip(batch, predictions): + yield PredictionResult( + example=row, inference=float(prediction), model_id=self._model_uri) + + +class FormatTableOutput(beam.DoFn): + """DoFn that formats inference results into table output schema. + + Takes PredictionResult objects from KeyedModelHandler and formats them + into dictionaries suitable for writing to BigQuery or other table outputs. + """ + def __init__(self, feature_columns: list[str]): + self.feature_columns = feature_columns + + def process( + self, element: tuple[str, PredictionResult]) -> Iterable[dict[str, Any]]: + """Process a keyed inference result into table output format. + + Args: + element: Tuple of (row_key, PredictionResult) + + Yields: + Dictionary with all input fields plus prediction and metadata + """ + key, prediction = element + row = prediction.example + row_dict = row._asdict() + output = {'row_key': key, 'prediction': prediction.inference} + + if prediction.model_id: + output['model_id'] = prediction.model_id + + for field_name in self.feature_columns: + output[f'input_{field_name}'] = row_dict.get(field_name, 0.0) + + yield output + + +def parse_json_to_table_row( + message: bytes, + schema_fields: Optional[list[str]] = None) -> tuple[str, beam.Row]: + """Parse JSON message to (key, beam.Row) format for KeyedModelHandler. + + Args: + message: JSON-encoded bytes + schema_fields: Optional list of expected field names + + Returns: + Tuple of (unique_key, beam.Row with parsed data) Review Comment:  The indentation for the `Args` and `Returns` sections in the docstring is incorrect. According to PEP 257 (Docstring Conventions), the descriptions for arguments and return values should be indented to align with the start of their respective descriptions, typically one level deeper than the `Args` or `Returns` keyword itself. ```python Args: message: JSON-encoded bytes schema_fields: Optional list of expected field names Returns: Tuple of (unique_key, beam.Row with parsed data) ``` ########## sdks/python/apache_beam/examples/inference/table_row_inference.py: ########## @@ -0,0 +1,369 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A pipeline that uses RunInference to perform inference on table rows. + +This pipeline demonstrates ML Pipelines #18: handling continuous new table +rows with RunInference using table input models. It reads structured data +(table rows) from a streaming source, performs inference while preserving +the table schema, and writes results to a table output. + +The pipeline supports both streaming and batch modes: +- Streaming: Reads from Pub/Sub, applies windowing, writes via streaming inserts +- Batch: Reads from file, processes all data, writes via file loads + +Example usage for streaming: + python table_row_inference.py \ + --mode=streaming \ + --input_subscription=projects/PROJECT/subscriptions/SUBSCRIPTION \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=REGION \ + --temp_location=gs://BUCKET/temp + +Example usage for batch: + python table_row_inference.py \ + --mode=batch \ + --input_file=gs://BUCKET/input.jsonl \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 + + # Batch with file output + python table_row_inference.py \ + --mode=batch \ + --input_file=data.jsonl \ + --output_file=predictions.jsonl \ + --model_path=model.pkl \ + --feature_columns=feature1,feature2,feature3 +""" + +import argparse +import hashlib +import json +import logging +from collections.abc import Iterable +from typing import Any +from typing import Optional + +import apache_beam as beam +import numpy as np +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.sklearn_inference import ModelFileType +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.runner import PipelineResult + + +class TableRowModelHandler(SklearnModelHandlerNumpy): + """ModelHandler that processes table rows (beam.Row objects) for inference. + + This handler extends SklearnModelHandlerNumpy to work with structured + table data represented as beam.Row objects. It extracts specified feature + columns from the row and converts them to numpy arrays for model input. + + Attributes: + feature_columns: List of column names to extract as features from input rows + """ + def __init__( + self, + model_uri: str, + feature_columns: list[str], + model_file_type: ModelFileType = ModelFileType.PICKLE): + """Initialize the TableRowModelHandler. + + Args: + model_uri: Path to the saved model file (local or GCS) + feature_columns: List of column names to use as model features + model_file_type: Type of model file (PICKLE or JOBLIB) + """ Review Comment:  The indentation for the `Args` section in the docstring is incorrect. According to PEP 257 (Docstring Conventions), the descriptions for arguments should be indented to align with the start of their respective descriptions, typically one level deeper than the `Args` keyword itself. ```python Args: model_uri: Path to the saved model file (local or GCS) feature_columns: List of column names to use as model features model_file_type: Type of model file (PICKLE or JOBLIB) ``` ########## sdks/python/apache_beam/examples/inference/table_row_inference.py: ########## @@ -0,0 +1,369 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A pipeline that uses RunInference to perform inference on table rows. + +This pipeline demonstrates ML Pipelines #18: handling continuous new table +rows with RunInference using table input models. It reads structured data +(table rows) from a streaming source, performs inference while preserving +the table schema, and writes results to a table output. + +The pipeline supports both streaming and batch modes: +- Streaming: Reads from Pub/Sub, applies windowing, writes via streaming inserts +- Batch: Reads from file, processes all data, writes via file loads + +Example usage for streaming: + python table_row_inference.py \ + --mode=streaming \ + --input_subscription=projects/PROJECT/subscriptions/SUBSCRIPTION \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=REGION \ + --temp_location=gs://BUCKET/temp + +Example usage for batch: + python table_row_inference.py \ + --mode=batch \ + --input_file=gs://BUCKET/input.jsonl \ + --output_table=PROJECT:DATASET.TABLE \ + --model_path=gs://BUCKET/model.pkl \ + --feature_columns=feature1,feature2,feature3 + + # Batch with file output + python table_row_inference.py \ + --mode=batch \ + --input_file=data.jsonl \ + --output_file=predictions.jsonl \ + --model_path=model.pkl \ + --feature_columns=feature1,feature2,feature3 +""" + +import argparse +import hashlib +import json +import logging +from collections.abc import Iterable +from typing import Any +from typing import Optional + +import apache_beam as beam +import numpy as np +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.sklearn_inference import ModelFileType +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.runner import PipelineResult + + +class TableRowModelHandler(SklearnModelHandlerNumpy): + """ModelHandler that processes table rows (beam.Row objects) for inference. + + This handler extends SklearnModelHandlerNumpy to work with structured + table data represented as beam.Row objects. It extracts specified feature + columns from the row and converts them to numpy arrays for model input. + + Attributes: + feature_columns: List of column names to extract as features from input rows + """ + def __init__( + self, + model_uri: str, + feature_columns: list[str], + model_file_type: ModelFileType = ModelFileType.PICKLE): + """Initialize the TableRowModelHandler. + + Args: + model_uri: Path to the saved model file (local or GCS) + feature_columns: List of column names to use as model features + model_file_type: Type of model file (PICKLE or JOBLIB) + """ + super().__init__(model_uri=model_uri, model_file_type=model_file_type) + self.feature_columns = feature_columns + + def run_inference( + self, + batch: list[beam.Row], + model: Any, + inference_args: Optional[dict[str, Any]] = None + ) -> Iterable[PredictionResult]: + """Run inference on a batch of beam.Row objects. + + Args: + batch: List of beam.Row objects containing input features + model: Loaded sklearn model + inference_args: Optional additional arguments for inference + + Yields: + PredictionResult containing the original row and prediction + """ + features_array = [] + for row in batch: + row_dict = row._asdict() + features = [row_dict.get(col, 0.0) for col in self.feature_columns] + features_array.append(features) + + features_array = np.array(features_array, dtype=np.float32) + predictions = model.predict(features_array) + + for row, prediction in zip(batch, predictions): + yield PredictionResult( + example=row, inference=float(prediction), model_id=self._model_uri) + + +class FormatTableOutput(beam.DoFn): + """DoFn that formats inference results into table output schema. + + Takes PredictionResult objects from KeyedModelHandler and formats them + into dictionaries suitable for writing to BigQuery or other table outputs. + """ + def __init__(self, feature_columns: list[str]): + self.feature_columns = feature_columns + + def process( + self, element: tuple[str, PredictionResult]) -> Iterable[dict[str, Any]]: + """Process a keyed inference result into table output format. + + Args: + element: Tuple of (row_key, PredictionResult) + + Yields: + Dictionary with all input fields plus prediction and metadata + """ + key, prediction = element + row = prediction.example + row_dict = row._asdict() + output = {'row_key': key, 'prediction': prediction.inference} + + if prediction.model_id: + output['model_id'] = prediction.model_id + + for field_name in self.feature_columns: + output[f'input_{field_name}'] = row_dict.get(field_name, 0.0) + + yield output + + +def parse_json_to_table_row( + message: bytes, + schema_fields: Optional[list[str]] = None) -> tuple[str, beam.Row]: + """Parse JSON message to (key, beam.Row) format for KeyedModelHandler. + + Args: + message: JSON-encoded bytes + schema_fields: Optional list of expected field names + + Returns: + Tuple of (unique_key, beam.Row with parsed data) + """ + data = json.loads(message.decode('utf-8')) + + row_key = data.get('id', hashlib.sha256(message).hexdigest()) + + row_fields = {} + for key, value in data.items(): + if key != 'id' and (schema_fields is None or key in schema_fields): + if isinstance(value, (int, float)): + row_fields[key] = float(value) + else: + row_fields[key] = value + + table_row = beam.Row(**row_fields) + return row_key, table_row + + +def build_output_schema(feature_columns: list[str]) -> str: + """Build BigQuery schema string for output table. + + Args: + feature_columns: List of feature column names + + Returns: + BigQuery schema string Review Comment:  The indentation for the `Args` and `Returns` sections in the docstring is incorrect. According to PEP 257 (Docstring Conventions), the descriptions for arguments and return values should be indented to align with the start of their respective descriptions, typically one level deeper than the `Args` or `Returns` keyword itself. ```python Args: feature_columns: List of feature column names Returns: BigQuery schema string ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
