gemini-code-assist[bot] commented on code in PR #38404: URL: https://github.com/apache/beam/pull/38404#discussion_r3324602573
########## sdks/python/apache_beam/examples/ml_transform/mltransform_one_hot_encoding.py: ########## @@ -0,0 +1,266 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Categorical encoding pipeline using MLTransform for batch processing. + +This pipeline demonstrates MLTransform's ComputeAndApplyVocabulary transform +for categorical feature encoding. It can either read input data from a file +or generate synthetic test data, computes vocabulary on categorical columns, +and converts categorical values to integer indices. + +Example usage with input file: + python mltransform_one_hot_encoding.py \ + --input_file=gs://bucket/input.jsonl \ + --output_file=gs://bucket/output.jsonl \ + --artifact_location=gs://bucket/artifacts \ + --categorical_columns=category \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=us-central1 \ + --temp_location=gs://bucket/temp + +Example usage with synthetic data: + python mltransform_one_hot_encoding.py \ + --output_file=gs://bucket/output.jsonl \ + --categorical_columns=category \ + --num_records=100000 \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=us-central1 +""" + +import argparse +import json +import logging +import tempfile +from typing import Any + +import apache_beam as beam +from apache_beam.ml.transforms.base import MLTransform +from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary +from apache_beam.runners.runner import PipelineResult + + +def parse_json_line(line: str) -> dict[str, Any]: + """Parse a JSON line into a dictionary.""" + try: + return json.loads(line) + except json.JSONDecodeError as e: + raise ValueError(f"Failed to parse JSON line: {line[:200]}...") from e + + +def parse_text_line(line: str, + categorical_columns: list[str]) -> dict[str, Any]: + """Parse plain text line into the first categorical column.""" + text_value = line.strip() + if not text_value: + text_value = 'unknown' + return {categorical_columns[0]: text_value} + + +def format_json_output(element: Any) -> str: + """Format output element as JSON string.""" + def to_json_compatible(value: Any) -> Any: + """Recursively convert non-JSON types (e.g. numpy arrays/scalars).""" + if isinstance(value, dict): + return {k: to_json_compatible(v) for k, v in value.items()} + if isinstance(value, (list, tuple)): + return [to_json_compatible(v) for v in value] + + # MLTransform outputs may include numpy scalar/ndarray values. + if hasattr(value, 'tolist'): + return to_json_compatible(value.tolist()) + if hasattr(value, 'item'): + try: + return to_json_compatible(value.item()) + except (TypeError, ValueError): + pass + return value + + if hasattr(element, 'as_dict'): + return json.dumps(to_json_compatible(element.as_dict())) + if hasattr(element, '_asdict'): + return json.dumps(to_json_compatible(element._asdict())) + return json.dumps(to_json_compatible(dict(element))) + + +def generate_synthetic_record(index: int, + categorical_columns: list[str]) -> dict[str, str]: + """Generate a deterministic synthetic record with categorical values.""" + categories = [ + 'electronics', + 'clothing', + 'food', + 'books', + 'sports', + 'home', + 'toys', + 'health', + 'automotive', + 'garden' + ] + colors = [ + 'red', + 'blue', + 'green', + 'yellow', + 'black', + 'white', + 'purple', + 'orange', + 'pink', + 'gray' + ] + sizes = ['small', 'medium', 'large', 'xlarge', 'tiny', 'huge'] + + record = {} + for col in categorical_columns: + if col.lower() in ['category', 'type', 'product']: + record[col] = categories[index % len(categories)] + elif col.lower() in ['color', 'colour']: + record[col] = colors[index % len(colors)] + elif col.lower() in ['size', 'dimension']: + record[col] = sizes[index % len(sizes)] + else: + # Default to categories for unknown columns + record[col] = categories[index % len(categories)] + return record + + +def run( + argv=None, + save_main_session=True, + test_pipeline=None) -> PipelineResult | None: + """Run the categorical encoding pipeline.""" + known_args, pipeline_args = parse_known_args(argv) + + categorical_columns = [ + col.strip() for col in known_args.categorical_columns.split(',') + ] + + if not categorical_columns or not categorical_columns[0]: + raise ValueError("At least one categorical column must be specified") + + if not known_args.output_file: + raise ValueError("--output_file is required") + + # Create artifact location if not provided + artifact_location = known_args.artifact_location + if not artifact_location: + artifact_location = tempfile.mkdtemp() + logging.info("Using temporary artifact location: %s", artifact_location) + + pipeline_options = beam.options.pipeline_options.PipelineOptions( + pipeline_args) + pipeline_options.view_as( + beam.options.pipeline_options.SetupOptions + ).save_main_session = save_main_session Review Comment:  When running on a distributed runner like `DataflowRunner`, using a local temporary directory (`tempfile.mkdtemp()`) as the `artifact_location` will fail because remote workers cannot access the local filesystem of the submission machine. It is highly recommended to check if a GCP temporary location is configured in `PipelineOptions` and use it as a fallback instead. ```suggestion pipeline_options = beam.options.pipeline_options.PipelineOptions( pipeline_args) pipeline_options.view_as( beam.options.pipeline_options.SetupOptions ).save_main_session = save_main_session # Create artifact location if not provided artifact_location = known_args.artifact_location if not artifact_location: gcp_options = pipeline_options.view_as( beam.options.pipeline_options.GoogleCloudOptions) if gcp_options.temp_location: artifact_location = gcp_options.temp_location + '/mltransform_artifacts' else: artifact_location = tempfile.mkdtemp() logging.info("Using artifact location: %s", artifact_location) ``` ########## sdks/python/apache_beam/testing/benchmarks/inference/mltransform_one_hot_encoding_benchmark.py: ########## @@ -0,0 +1,197 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pytype: skip-file + +"""Benchmark test for MLTransform One-Hot Encoding pipeline. + +This benchmark measures the performance of MLTransform for one-hot encoding +categorical features, including throughput, latency, and cost metrics on +Dataflow. +""" + +import logging + +from google.cloud import monitoring_v3 +from google.protobuf.duration_pb2 import Duration + +from apache_beam.examples.ml_transform import mltransform_one_hot_encoding +from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.options.pipeline_options import WorkerOptions +from apache_beam.testing.load_tests.dataflow_cost_benchmark import DataflowCostBenchmark +from apache_beam.testing.load_tests.load_test import LoadTestOptions + + +class MLTransformOneHotEncodingOptions( + LoadTestOptions, + StandardOptions, + GoogleCloudOptions, + WorkerOptions, + DebugOptions, + SetupOptions, +): + """Pipeline options for MLTransform One-Hot Encoding benchmark.""" + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--input_file', + default='', + help='Input JSONL/text file path for benchmark data.') + parser.add_argument( + '--input_format', + choices=['jsonl', 'text'], + default='jsonl', + help='Input file format for input_file: jsonl or text.') + parser.add_argument( + '--output_file', + required=True, + help='Output file path for encoded results') + parser.add_argument( + '--artifact_location', + required=True, + help='GCS path to store MLTransform artifacts') + parser.add_argument( + '--categorical_columns', + default='category', + help='Comma-separated list of categorical column names to encode') + parser.add_argument( + '--num_records', + type=int, + default=100000, + help='Number of synthetic records to generate') + + +class MLTransformOneHotEncodingBenchmarkTest(DataflowCostBenchmark): + """Benchmark for MLTransform One-Hot Encoding on Dataflow. + + This benchmark measures: + - Throughput: Elements processed per second + - Latency: Time to process input records + - Cost: Estimated cost on Dataflow + + The pcollection is chosen to capture the output of the MLTransform + step where one-hot encoding is applied. + """ + options_class = MLTransformOneHotEncodingOptions + + def __init__(self): + self.metrics_namespace = 'BeamML_MLTransform' + # Use the output of MLTransform step for throughput measurement + # This captures the processed data after vocabulary encoding + super().__init__( + metrics_namespace=self.metrics_namespace, + is_streaming=False, + pcollection='FormatOutput.out0') + + def test(self): + """Execute the one-hot encoding pipeline for benchmarking.""" + extra_opts = {} + + extra_opts['output_file'] = self.pipeline.get_option('output_file') + extra_opts['artifact_location'] = self.pipeline.get_option( + 'artifact_location') + extra_opts['categorical_columns'] = ( + self.pipeline.get_option('categorical_columns') or 'category') + + input_file = self.pipeline.get_option('input_file') + if input_file: + extra_opts['input_file'] = input_file + extra_opts['input_format'] = ( + self.pipeline.get_option('input_format') or 'jsonl') + else: + # Handle synthetic data generation + num_records = self.pipeline.get_option('num_records') + if num_records: + extra_opts['num_records'] = int(num_records) + + self.result = mltransform_one_hot_encoding.run( + self.pipeline.get_full_options_as_args(**extra_opts), + test_pipeline=self.pipeline) + + def _get_throughput_metrics( + self, + project: str, + job_id: str, + start_time: str, + end_time: str, + pcollection_name: str | None = None, + ) -> dict[str, float]: + """Get throughput metrics with runner-v2-friendly fallbacks.""" + candidate_pcollections = [] + if pcollection_name: + candidate_pcollections.append(pcollection_name) + candidate_pcollections.extend([ + self.pcollection, + 'MLTransform.out0', + 'FormatOutput.out0', + ]) + + # Deduplicate while preserving order. + seen = set() + unique_candidates = [] + for name in candidate_pcollections: + if name and name not in seen: + seen.add(name) + unique_candidates.append(name) + + for name in unique_candidates: + metrics = super()._get_throughput_metrics( + project, job_id, start_time, end_time, pcollection_name=name) + if (metrics.get('AvgThroughputBytes', 0) > 0 or + metrics.get('AvgThroughputElements', 0) > 0): + return metrics + + # Final fallback: aggregate job-level throughput without pcollection label. + interval = monitoring_v3.TimeInterval( + start_time=start_time, end_time=end_time) + aggregation = monitoring_v3.Aggregation( + alignment_period=Duration(seconds=60), + per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN) + requests = { + "Bytes": monitoring_v3.ListTimeSeriesRequest( + name=f"projects/{project}", + filter=( + 'metric.type="dataflow.googleapis.com/job/estimated_byte_count" ' + f'AND metric.labels.job_id="{job_id}"'), + interval=interval, + aggregation=aggregation), + "Elements": monitoring_v3.ListTimeSeriesRequest( + name=f"projects/{project}", + filter=( + 'metric.type="dataflow.googleapis.com/job/element_count" ' + f'AND metric.labels.job_id="{job_id}"'), + interval=interval, + aggregation=aggregation), + } Review Comment:  The metric types `dataflow.googleapis.com/job/estimated_byte_count` and `dataflow.googleapis.com/job/element_count` do not exist in Google Cloud Monitoring for Dataflow. Throughput metrics are tracked at the PCollection level. You should use `dataflow.googleapis.com/pcollection/estimated_byte_count` and `dataflow.googleapis.com/pcollection/elements_produced_count` instead. ```python requests = { "Bytes": monitoring_v3.ListTimeSeriesRequest( name=f"projects/{project}", filter=( 'metric.type="dataflow.googleapis.com/pcollection/estimated_byte_count" ' f'AND metric.labels.job_id="{job_id}"'), interval=interval, aggregation=aggregation), "Elements": monitoring_v3.ListTimeSeriesRequest( name=f"projects/{project}", filter=( 'metric.type="dataflow.googleapis.com/pcollection/elements_produced_count" ' f'AND metric.labels.job_id="{job_id}"'), interval=interval, aggregation=aggregation), } ``` ########## sdks/python/apache_beam/examples/ml_transform/mltransform_one_hot_encoding.py: ########## @@ -0,0 +1,266 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Categorical encoding pipeline using MLTransform for batch processing. + +This pipeline demonstrates MLTransform's ComputeAndApplyVocabulary transform +for categorical feature encoding. It can either read input data from a file +or generate synthetic test data, computes vocabulary on categorical columns, +and converts categorical values to integer indices. + +Example usage with input file: + python mltransform_one_hot_encoding.py \ + --input_file=gs://bucket/input.jsonl \ + --output_file=gs://bucket/output.jsonl \ + --artifact_location=gs://bucket/artifacts \ + --categorical_columns=category \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=us-central1 \ + --temp_location=gs://bucket/temp + +Example usage with synthetic data: + python mltransform_one_hot_encoding.py \ + --output_file=gs://bucket/output.jsonl \ + --categorical_columns=category \ + --num_records=100000 \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=us-central1 +""" + +import argparse +import json +import logging +import tempfile +from typing import Any + +import apache_beam as beam +from apache_beam.ml.transforms.base import MLTransform +from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary +from apache_beam.runners.runner import PipelineResult + + +def parse_json_line(line: str) -> dict[str, Any]: + """Parse a JSON line into a dictionary.""" + try: + return json.loads(line) + except json.JSONDecodeError as e: + raise ValueError(f"Failed to parse JSON line: {line[:200]}...") from e + + +def parse_text_line(line: str, + categorical_columns: list[str]) -> dict[str, Any]: + """Parse plain text line into the first categorical column.""" + text_value = line.strip() + if not text_value: + text_value = 'unknown' + return {categorical_columns[0]: text_value} + + +def format_json_output(element: Any) -> str: + """Format output element as JSON string.""" + def to_json_compatible(value: Any) -> Any: + """Recursively convert non-JSON types (e.g. numpy arrays/scalars).""" + if isinstance(value, dict): + return {k: to_json_compatible(v) for k, v in value.items()} + if isinstance(value, (list, tuple)): + return [to_json_compatible(v) for v in value] + + # MLTransform outputs may include numpy scalar/ndarray values. + if hasattr(value, 'tolist'): + return to_json_compatible(value.tolist()) + if hasattr(value, 'item'): + try: + return to_json_compatible(value.item()) + except (TypeError, ValueError): + pass + return value + + if hasattr(element, 'as_dict'): + return json.dumps(to_json_compatible(element.as_dict())) + if hasattr(element, '_asdict'): + return json.dumps(to_json_compatible(element._asdict())) + return json.dumps(to_json_compatible(dict(element))) + + +def generate_synthetic_record(index: int, + categorical_columns: list[str]) -> dict[str, str]: + """Generate a deterministic synthetic record with categorical values.""" + categories = [ + 'electronics', + 'clothing', + 'food', + 'books', + 'sports', + 'home', + 'toys', + 'health', + 'automotive', + 'garden' + ] + colors = [ + 'red', + 'blue', + 'green', + 'yellow', + 'black', + 'white', + 'purple', + 'orange', + 'pink', + 'gray' + ] + sizes = ['small', 'medium', 'large', 'xlarge', 'tiny', 'huge'] + + record = {} + for col in categorical_columns: + if col.lower() in ['category', 'type', 'product']: + record[col] = categories[index % len(categories)] + elif col.lower() in ['color', 'colour']: + record[col] = colors[index % len(colors)] + elif col.lower() in ['size', 'dimension']: + record[col] = sizes[index % len(sizes)] + else: + # Default to categories for unknown columns + record[col] = categories[index % len(categories)] + return record + + +def run( + argv=None, + save_main_session=True, + test_pipeline=None) -> PipelineResult | None: + """Run the categorical encoding pipeline.""" + known_args, pipeline_args = parse_known_args(argv) + + categorical_columns = [ + col.strip() for col in known_args.categorical_columns.split(',') + ] + + if not categorical_columns or not categorical_columns[0]: + raise ValueError("At least one categorical column must be specified") + + if not known_args.output_file: + raise ValueError("--output_file is required") + + # Create artifact location if not provided + artifact_location = known_args.artifact_location + if not artifact_location: + artifact_location = tempfile.mkdtemp() + logging.info("Using temporary artifact location: %s", artifact_location) + + pipeline_options = beam.options.pipeline_options.PipelineOptions( + pipeline_args) + pipeline_options.view_as( + beam.options.pipeline_options.SetupOptions + ).save_main_session = save_main_session + + pipeline = test_pipeline or beam.Pipeline(options=pipeline_options) + + # Use synthetic data or read from file + if known_args.input_file: + # Read and parse input data from file + if known_args.input_format == 'jsonl': + parse_input_fn = parse_json_line + else: + if len(categorical_columns) > 1: + logging.warning( + 'Input format is "text" but multiple categorical columns are ' + 'specified. Only the first column "%s" will be used for parsing.', + categorical_columns[0]) + parse_input_fn = lambda line: parse_text_line(line, categorical_columns) + raw_data = ( + pipeline + | 'ReadFromJSONL' >> beam.io.ReadFromText(known_args.input_file) + | 'ParseInput' >> beam.Map(parse_input_fn)) + else: + # Generate synthetic data + num_records = known_args.num_records or 100000 + logging.info("Generating %d synthetic records", num_records) + + raw_data = ( + pipeline + | 'GenerateSyntheticIndexes' >> beam.Create(range(num_records)) + | 'BuildSyntheticRecord' >> beam.Map( + lambda idx: generate_synthetic_record(idx, categorical_columns))) Review Comment:  Avoid using a `lambda` function here as well. You can pass `categorical_columns` directly as an additional argument to `beam.Map`. ```python | 'BuildSyntheticRecord' >> beam.Map( generate_synthetic_record, categorical_columns) ``` ########## sdks/python/apache_beam/examples/ml_transform/mltransform_one_hot_encoding.py: ########## @@ -0,0 +1,266 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Categorical encoding pipeline using MLTransform for batch processing. + +This pipeline demonstrates MLTransform's ComputeAndApplyVocabulary transform +for categorical feature encoding. It can either read input data from a file +or generate synthetic test data, computes vocabulary on categorical columns, +and converts categorical values to integer indices. + +Example usage with input file: + python mltransform_one_hot_encoding.py \ + --input_file=gs://bucket/input.jsonl \ + --output_file=gs://bucket/output.jsonl \ + --artifact_location=gs://bucket/artifacts \ + --categorical_columns=category \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=us-central1 \ + --temp_location=gs://bucket/temp + +Example usage with synthetic data: + python mltransform_one_hot_encoding.py \ + --output_file=gs://bucket/output.jsonl \ + --categorical_columns=category \ + --num_records=100000 \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=us-central1 +""" + +import argparse +import json +import logging +import tempfile +from typing import Any + +import apache_beam as beam +from apache_beam.ml.transforms.base import MLTransform +from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary +from apache_beam.runners.runner import PipelineResult + + +def parse_json_line(line: str) -> dict[str, Any]: + """Parse a JSON line into a dictionary.""" + try: + return json.loads(line) + except json.JSONDecodeError as e: + raise ValueError(f"Failed to parse JSON line: {line[:200]}...") from e + + +def parse_text_line(line: str, + categorical_columns: list[str]) -> dict[str, Any]: + """Parse plain text line into the first categorical column.""" + text_value = line.strip() + if not text_value: + text_value = 'unknown' + return {categorical_columns[0]: text_value} + + +def format_json_output(element: Any) -> str: + """Format output element as JSON string.""" + def to_json_compatible(value: Any) -> Any: + """Recursively convert non-JSON types (e.g. numpy arrays/scalars).""" + if isinstance(value, dict): + return {k: to_json_compatible(v) for k, v in value.items()} + if isinstance(value, (list, tuple)): + return [to_json_compatible(v) for v in value] + + # MLTransform outputs may include numpy scalar/ndarray values. + if hasattr(value, 'tolist'): + return to_json_compatible(value.tolist()) + if hasattr(value, 'item'): + try: + return to_json_compatible(value.item()) + except (TypeError, ValueError): + pass + return value + + if hasattr(element, 'as_dict'): + return json.dumps(to_json_compatible(element.as_dict())) + if hasattr(element, '_asdict'): + return json.dumps(to_json_compatible(element._asdict())) + return json.dumps(to_json_compatible(dict(element))) + + +def generate_synthetic_record(index: int, + categorical_columns: list[str]) -> dict[str, str]: + """Generate a deterministic synthetic record with categorical values.""" + categories = [ + 'electronics', + 'clothing', + 'food', + 'books', + 'sports', + 'home', + 'toys', + 'health', + 'automotive', + 'garden' + ] + colors = [ + 'red', + 'blue', + 'green', + 'yellow', + 'black', + 'white', + 'purple', + 'orange', + 'pink', + 'gray' + ] + sizes = ['small', 'medium', 'large', 'xlarge', 'tiny', 'huge'] + + record = {} + for col in categorical_columns: + if col.lower() in ['category', 'type', 'product']: + record[col] = categories[index % len(categories)] + elif col.lower() in ['color', 'colour']: + record[col] = colors[index % len(colors)] + elif col.lower() in ['size', 'dimension']: + record[col] = sizes[index % len(sizes)] + else: + # Default to categories for unknown columns + record[col] = categories[index % len(categories)] + return record + + +def run( + argv=None, + save_main_session=True, + test_pipeline=None) -> PipelineResult | None: + """Run the categorical encoding pipeline.""" + known_args, pipeline_args = parse_known_args(argv) + + categorical_columns = [ + col.strip() for col in known_args.categorical_columns.split(',') + ] + + if not categorical_columns or not categorical_columns[0]: + raise ValueError("At least one categorical column must be specified") + + if not known_args.output_file: + raise ValueError("--output_file is required") + + # Create artifact location if not provided + artifact_location = known_args.artifact_location + if not artifact_location: + artifact_location = tempfile.mkdtemp() + logging.info("Using temporary artifact location: %s", artifact_location) + + pipeline_options = beam.options.pipeline_options.PipelineOptions( + pipeline_args) + pipeline_options.view_as( + beam.options.pipeline_options.SetupOptions + ).save_main_session = save_main_session + + pipeline = test_pipeline or beam.Pipeline(options=pipeline_options) + + # Use synthetic data or read from file + if known_args.input_file: + # Read and parse input data from file + if known_args.input_format == 'jsonl': + parse_input_fn = parse_json_line + else: + if len(categorical_columns) > 1: + logging.warning( + 'Input format is "text" but multiple categorical columns are ' + 'specified. Only the first column "%s" will be used for parsing.', + categorical_columns[0]) + parse_input_fn = lambda line: parse_text_line(line, categorical_columns) + raw_data = ( + pipeline + | 'ReadFromJSONL' >> beam.io.ReadFromText(known_args.input_file) + | 'ParseInput' >> beam.Map(parse_input_fn)) Review Comment:  Avoid using `lambda` functions inside `beam.Map` to prevent potential serialization issues with `dill` or `pickle` on distributed runners. Additionally, when the input format is `text`, naming the transform `ReadFromJSONL` is misleading. We can clean this up by passing the arguments directly to `beam.Map` and using appropriate transform names. ```suggestion if known_args.input_format == 'jsonl': raw_data = ( pipeline | 'ReadFromJSONL' >> beam.io.ReadFromText(known_args.input_file) | 'ParseInput' >> beam.Map(parse_json_line)) else: if len(categorical_columns) > 1: logging.warning( 'Input format is "text" but multiple categorical columns are ' 'specified. Only the first column "%s" will be used for parsing.', categorical_columns[0]) raw_data = ( pipeline | 'ReadFromText' >> beam.io.ReadFromText(known_args.input_file) | 'ParseInput' >> beam.Map(parse_text_line, categorical_columns)) ``` ########## website/www/site/data/performance.yaml: ########## @@ -298,4 +298,20 @@ looks: - id: tPYwJngPDBsKjK3DNgGHGsCyTpxdfmTB title: AvgThroughputBytesPerSec by Version - id: cC75NnCbQT3mQmKVHtDxzptXpwPb64qz - title: AvgThroughputElementsPerSec by Version + title: AvgThroughputElementsPerSec by Version + mltransformonehot: + write: + folder: 108 + cost: + - id: 37DYwfbr5y4gt7g7g7RzRSzGTjd4Jjbj + title: RunTime and EstimatedCost + date: + - id: trcXrBCyPjYj2jTGc3px3d72xMXPCmZb + title: AvgThroughputBytesPerSec by Date + - id: yczZ4G8rYcP3SHjtQXz7p4BdRhGcXydx + title: AvgThroughputElementsPerSec by Date + version: + - id: mHtwwXKndpJVPjnkcHqRZpfPzfzHtT38 + title: AvgThroughputBytesPerSec by Version + - id: Cn5FsXkdy2ZXCxCJshSCxcjsTW3TXf3c Review Comment:  The second Look under the `version` section is missing its `title` field. Please add the title to ensure it renders correctly on the performance dashboard. ```yaml - id: Cn5FsXkdy2ZXCxCJshSCxcjsTW3TXf3c title: AvgThroughputElementsPerSec by Version ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
