gemini-code-assist[bot] commented on code in PR #38696:
URL: https://github.com/apache/beam/pull/38696#discussion_r3304079053
##########
sdks/python/build.gradle:
##########
@@ -124,10 +124,36 @@ tasks.register("generateYamlDocs") {
outputs.file "${buildDir}/yaml-examples.html"
}
+tasks.register("installYamlIntegrationTestDeps") {
+ dependsOn installGcpTest
+ doLast {
+ exec {
+ executable 'sh'
+ args '-c', ". ${envdir}/bin/activate && " +
+ "py_ver=\$(python -c 'import sys;
print(f\"{sys.version_info.major}{sys.version_info.minor}\")') && " +
+ "ml_extra=\"ml_test\" && " +
+ "if [ \"\$py_ver\" -ge 313 ]; then
ml_extra=\"p\${py_ver}_ml_test\"; fi && " +
+ "constraint_file=\"\" && " +
+ "if [ -f
\"container/ml/py\${py_ver}/base_image_requirements.txt\" ]; then " +
+ "
constraint_file=\"container/ml/py\${py_ver}/base_image_requirements.txt\"; " +
+ "elif [ -f
\"container/py\${py_ver}/base_image_requirements.txt\" ]; then " +
+ "
constraint_file=\"container/py\${py_ver}/base_image_requirements.txt\"; " +
+ "fi && " +
+ "if [ -n \"\$constraint_file\" ]; then " +
+ " echo \"Installing with constraint file: \$constraint_file\"
&& " +
+ " pip install --pre --retries 10 -c \"\$constraint_file\"
\${buildDir}/apache-beam.tar.gz[\$ml_extra,yaml,transformers]; " +
+ "else " +
+ " echo \"No constraint file found, installing without
constraints\" && " +
+ " pip install --pre --retries 10
\${buildDir}/apache-beam.tar.gz[\$ml_extra,yaml,transformers]; " +
+ "fi"
Review Comment:

The variable `buildDir` is a Gradle project property and should be
interpolated by Gradle before executing the shell command. Escaping it as
`\${buildDir}` prevents Gradle interpolation, causing the shell to evaluate it
as an empty environment variable, which leads to a file-not-found error during
installation. Removing the backslash allows Gradle to correctly resolve the
path.
```
"if [ -n \"\$constraint_file\" ]; then " +
" echo \"Installing with constraint file: \$constraint_file\"
&& " +
" pip install --pre --retries 10 -c \"\$constraint_file\"
${buildDir}/apache-beam.tar.gz[\$ml_extra,yaml,transformers]; " +
"else " +
" echo \"No constraint file found, installing without
constraints\" && " +
" pip install --pre --retries 10
${buildDir}/apache-beam.tar.gz[\$ml_extra,yaml,transformers]; " +
"fi"
```
##########
sdks/python/apache_beam/yaml/yaml_ml.py:
##########
@@ -282,6 +282,55 @@ def inference_output_type(self):
('model_id', Optional[str])])
[email protected]_handler_type('HuggingFacePipeline')
+class HuggingFacePipelineProvider(ModelHandlerProvider):
+ def __init__(
+ self,
+ task: Optional[str] = None,
+ model: Optional[str] = None,
+ preprocess: Optional[dict[str, str]] = None,
+ postprocess: Optional[dict[str, str]] = None,
+ device: Optional[Any] = None,
+ inference_fn: Optional[dict[str, str]] = None,
+ load_pipeline_args: Optional[dict[str, Any]] = None,
+ **kwargs):
+ try:
+ from apache_beam.ml.inference.huggingface_inference import
HuggingFacePipelineModelHandler
+ except ImportError:
+ raise ValueError(
+ 'Unable to import HuggingFacePipelineModelHandler. Please '
+ 'install transformers dependencies.')
+
+ kwargs = {k: v for k, v in kwargs.items() if not k.startswith('_')}
+
+ inference_fn_obj = self.parse_processing_transform(
+ inference_fn, 'inference_fn') if inference_fn else None
+
+ handler_kwargs = {}
+ if inference_fn_obj:
+ handler_kwargs['inference_fn'] = inference_fn_obj
+
+ _handler = HuggingFacePipelineModelHandler(
+ task=task,
+ model=model,
+ device=device,
+ load_pipeline_args=load_pipeline_args,
+ **handler_kwargs,
+ **kwargs)
+
+ super().__init__(_handler, preprocess, postprocess)
+
+ @staticmethod
+ def validate(config):
+ if not config.get('task') and not config.get('model'):
+ raise ValueError(
+ "HuggingFacePipeline requires either 'task' or "
+ "'model' to be specified.")
Review Comment:

If `config` is `None` or empty, calling `config.get()` will raise an
`AttributeError`. Adding a defensive check for `not config` ensures the
validation fails gracefully with a clear error message.
```suggestion
def validate(config):
if not config or (not config.get('task') and not config.get('model')):
raise ValueError(
"HuggingFacePipeline requires either 'task' or "
"'model' to be specified.")
```
##########
sdks/python/apache_beam/yaml/tests/runinference_huggingface.yaml:
##########
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+pipelines:
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - text: "I love Apache Beam!"
+ - text: "I hate this error."
+ - type: RunInference
+ config:
+ model_handler:
+ type: "HuggingFacePipeline"
+ config:
+ task: "text-classification"
+ inference_fn:
+ callable: |
+ def real_inference(batch, pipeline, inference_args):
+ predictions = pipeline(batch, **inference_args)
+
+ # If it's a single dictionary (batch size of 1), wrap it
in a list
+ if isinstance(predictions, dict):
+ predictions = [predictions]
+
+ return {
+ 'label': [p['label'] for p in predictions],
+ 'score': [p['score'] for p in predictions]
+ }
Review Comment:

There are two issues here:
1. `inference_args` can be `None` if not provided, which will cause
`**inference_args` to raise a `TypeError`.
2. `real_inference` returns a dictionary of lists. Since `RunInference`
expects an iterable of predictions corresponding to each element in the batch,
returning a dictionary will cause it to iterate over the dictionary's keys
(`'label'` and `'score'`). This will lead to a `TypeError` in the subsequent
`MapToFields` step when trying to access `x.inference.inference["label"]` on a
string. Returning a list of dictionaries resolves this.
```yaml
def real_inference(batch, pipeline, inference_args):
inference_args = inference_args or {}
predictions = pipeline(batch, **inference_args)
# If it's a single dictionary (batch size of 1), wrap
it in a list
if isinstance(predictions, dict):
predictions = [predictions]
return [
{
'label': p['label'],
'score': p['score']
} for p in predictions
]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]