This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3415378fc13 [YAML] A Streaming Inference Pipeline - YouTube Comments
Sentiment Analysis (#35375)
3415378fc13 is described below
commit 3415378fc134eb23bf714083815c041fce8db79f
Author: Charles Nguyen <[email protected]>
AuthorDate: Tue Jul 22 18:31:48 2025 -0400
[YAML] A Streaming Inference Pipeline - YouTube Comments Sentiment Analysis
(#35375)
* Add YAML streaming sentiment analysis pipeline
* WIP YAML example for streaming sentiment analysis pipeline
* Clean up
* Clean up
* Clean up
* Add comments and update README.md
* Fix rebase
* Fix test and lint
* Fix test
* Fix CI/CD
* Address comments
* Address comments and fix CI/CD
* Fix CI/CD
---
sdks/python/apache_beam/yaml/examples/README.md | 9 +-
.../yaml/examples/testing/examples_test.py | 142 +++++++++++-
.../yaml/examples/testing/input_data.py | 9 +
.../transforms/ml/sentiment_analysis/README.md | 93 ++++++++
.../streaming_sentiment_analysis.yaml | 257 +++++++++++++++++++++
5 files changed, 501 insertions(+), 9 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/examples/README.md
b/sdks/python/apache_beam/yaml/examples/README.md
index 70533655a6a..8f6decb6bb4 100644
--- a/sdks/python/apache_beam/yaml/examples/README.md
+++ b/sdks/python/apache_beam/yaml/examples/README.md
@@ -231,8 +231,13 @@ gcloud dataflow yaml run $JOB_NAME \
### ML
-These examples leverage the built-in `Enrichment` transform for performing
-ML enrichments.
+Examples that include the built-in `Enrichment` transform for performing
+ML enrichments:
+- [bigquery_enrichment.yaml](transforms/ml/enrichment/bigquery_enrichment.yaml)
+- [spanner_enrichment.yaml](transforms/ml/enrichment/spanner_enrichment.yaml)
+
+Examples that include the `RunInference` transform for ML inference:
+-
[streaming_sentiment_analysis.yaml](transforms/ml/inference/streaming_sentiment_analysis.yaml)
More information can be found about aggregation transforms
[here](https://beam.apache.org/documentation/sdks/yaml-combine/).
diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
index 3b46a9dda5d..205480b418a 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
@@ -37,8 +37,10 @@ import yaml
import apache_beam as beam
from apache_beam import PCollection
from apache_beam.examples.snippets.util import assert_matches_stdout
+from apache_beam.ml.inference.base import PredictionResult
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.typehints.row_type import RowTypeConstraint
from apache_beam.utils import subprocess_server
from apache_beam.yaml import yaml_provider
from apache_beam.yaml import yaml_transform
@@ -120,6 +122,23 @@ def test_kafka_read(
bootstrap_servers,
auto_offset_reset_config,
consumer_config):
+ """
+ This PTransform simulates the behavior of the ReadFromKafka transform
+ with the RAW format by simply using some fixed sample text data and
+ encode it to raw bytes.
+
+ Args:
+ pcoll: The input PCollection.
+ format: The format of the Kafka messages (e.g., 'RAW').
+ topic: The name of Kafka topic to read from.
+ bootstrap_servers: A list of Kafka bootstrap servers to connect to.
+ auto_offset_reset_config: A configuration for the auto offset reset
+ consumer_config: A dictionary containing additional consumer configurations
+
+ Returns:
+ A PCollection containing the sample text data in bytes.
+ """
+
return (
pcoll | beam.Create(input_data.text_data().split('\n'))
| beam.Map(lambda element: beam.Row(payload=element.encode('utf-8'))))
@@ -127,7 +146,7 @@ def test_kafka_read(
@beam.ptransform.ptransform_fn
def test_pubsub_read(
- pbegin,
+ pcoll,
topic: Optional[str] = None,
subscription: Optional[str] = None,
format: Optional[str] = None,
@@ -140,15 +159,58 @@ def test_pubsub_read(
pubsub_messages = input_data.pubsub_messages_data()
return (
- pbegin
+ pcoll
| beam.Create([json.loads(msg.data) for msg in pubsub_messages])
| beam.Map(lambda element: beam.Row(**element)))
[email protected]_fn
+def test_run_inference(pcoll, inference_tag, model_handler):
+ """
+ This PTransform simulates the behavior of the RunInference transform.
+
+ Args:
+ pcoll: The input PCollection.
+ inference_tag: The tag to use for the returned inference.
+ model_handler: A configuration for the respective ML model handler
+
+ Returns:
+ A PCollection containing the enriched data.
+ """
+ def _fn(row):
+ input = row._asdict()
+
+ row = {
+ inference_tag: PredictionResult(
+ input['comment_text'],
+ [{
+ 'label': 'POSITIVE'
+ if 'happy' in input['comment_text'] else 'NEGATIVE',
+ 'score': 0.95
+ }]),
+ **input
+ }
+
+ return beam.Row(**row)
+
+ user_type = RowTypeConstraint.from_user_type(pcoll.element_type.user_type)
+ user_schema_fields = [(name, type(typ) if not isinstance(typ, type) else typ)
+ for (name,
+ typ) in user_type._fields] if user_type else []
+ inference_output_type = RowTypeConstraint.from_fields([
+ ('example', Any), ('inference', Any), ('model_id', Optional[str])
+ ])
+ schema = RowTypeConstraint.from_fields(
+ user_schema_fields + [(str(inference_tag), inference_output_type)])
+
+ return pcoll | beam.Map(_fn).with_output_types(schema)
+
+
TEST_PROVIDERS = {
'TestEnrichment': test_enrichment,
'TestReadFromKafka': test_kafka_read,
- 'TestReadFromPubSub': test_pubsub_read
+ 'TestReadFromPubSub': test_pubsub_read,
+ 'TestRunInference': test_run_inference
}
"""
Transforms not requiring inputs.
@@ -238,7 +300,12 @@ def create_test_method(
actual += list(transform.outputs.values())
check_output(expected)(actual)
- if 'deps' in pipeline_spec_file:
+ def _python_deps_involved(spec_filename):
+ return any(
+ substr in spec_filename
+ for substr in ['deps', 'streaming_sentiment_analysis'])
+
+ if _python_deps_involved(pipeline_spec_file):
test_yaml_example = pytest.mark.no_xdist(test_yaml_example)
test_yaml_example = unittest.skipIf(
sys.platform == 'win32', "Github virtualenv permissions issues.")(
@@ -457,7 +524,9 @@ def _kafka_test_preprocessor(
'test_pubsub_to_iceberg_yaml',
'test_oracle_to_bigquery_yaml',
'test_mysql_to_bigquery_yaml',
- 'test_spanner_to_bigquery_yaml'
+ 'test_spanner_to_bigquery_yaml',
+ 'test_streaming_sentiment_analysis_yaml',
+ 'test_enrich_spanner_with_bigquery_yaml'
])
def _io_write_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
@@ -782,9 +851,68 @@ def _db_io_read_test_processor(
return test_spec
[email protected]_test_preprocessor(
+ 'test_streaming_sentiment_analysis_yaml')
+def _streaming_sentiment_analysis_test_preprocessor(
+ test_spec: dict, expected: List[str], env: TestEnvironment):
+ """
+ Preprocessor for tests that involve the streaming sentiment analysis example.
+
+ This preprocessor replaces several IO transforms and the RunInference
+ transform.
+ This allows the test to verify the pipeline's correctness without relying on
+ external data sources and the model hosted on VertexAI.
+
+ Args:
+ test_spec: The dictionary representation of the YAML pipeline
specification.
+ expected: A list of strings representing the expected output of the
+ pipeline.
+ env: The TestEnvironment object providing utilities for creating temporary
+ files.
+
+ Returns:
+ The modified test_spec dictionary with ... transforms replaced.
+ """
+ if pipeline := test_spec.get('pipeline', None):
+ for transform in pipeline.get('transforms', []):
+ if transform.get('type', '') == 'PyTransform' and transform.get(
+ 'name', '') == 'ReadFromGCS':
+ transform['windowing'] = {'type': 'fixed', 'size': '30s'}
+
+ file_name = 'youtube-comments.csv'
+ local_path = env.input_file(file_name, INPUT_FILES[file_name])
+ transform['config']['kwargs']['file_pattern'] = local_path
+
+ if pipeline := test_spec.get('pipeline', None):
+ for transform in pipeline.get('transforms', []):
+ if transform.get('type', '') == 'ReadFromKafka':
+ config = transform['config']
+ transform['type'] = 'ReadFromCsv'
+ transform['config'] = {
+ k: v
+ for k, v in config.items() if k.startswith('__')
+ }
+ transform['config']['path'] = ""
+
+ file_name = 'youtube-comments.csv'
+ test_spec = replace_recursive(
+ test_spec,
+ transform['type'],
+ 'path',
+ env.input_file(file_name, INPUT_FILES[file_name]))
+
+ if pipeline := test_spec.get('pipeline', None):
+ for transform in pipeline.get('transforms', []):
+ if transform.get('type', '') == 'RunInference':
+ transform['type'] = 'TestRunInference'
+
+ return test_spec
+
+
INPUT_FILES = {
'products.csv': input_data.products_csv(),
- 'kinglear.txt': input_data.text_data()
+ 'kinglear.txt': input_data.text_data(),
+ 'youtube-comments.csv': input_data.youtube_comments_csv()
}
INPUT_TABLES = {
@@ -819,7 +947,7 @@ IOTest = YamlExamplesTestSuite(
'../transforms/io/*.yaml')).run()
MLTest = YamlExamplesTestSuite(
'MLExamplesTest', os.path.join(YAML_DOCS_DIR,
- '../transforms/ml/*.yaml')).run()
+ '../transforms/ml/**/*.yaml')).run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/yaml/examples/testing/input_data.py
b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
index 0c601f67816..34a58c47923 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/input_data.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
@@ -54,6 +54,15 @@ def products_csv():
])
+def youtube_comments_csv():
+ return '\n'.join([
+ 'video_id,comment_text,likes,replies',
+ 'XpVt6Z1Gjjo,I AM HAPPY,1,1',
+ 'XpVt6Z1Gjjo,I AM SAD,1,1',
+ 'XpVt6Z1Gjjo,§ÁĐ,1,1'
+ ])
+
+
def spanner_orders_data():
return [{
'order_id': 1,
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
new file mode 100644
index 00000000000..ad44d433017
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
@@ -0,0 +1,93 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+## Streaming Sentiment Analysis
+
+The example leverages the `RunInference` transform with Vertex AI
+model handler [VertexAIModelHandlerJSON](
+https://beam.apache.org/releases/pydoc/current/apache_beam.yaml.yaml_ml#apache_beam.yaml.yaml_ml.VertexAIModelHandlerJSONProvider),
+in addition to Kafka IO to demonstrate an end-to-end example of a
+streaming sentiment analysis pipeline. The dataset to perform
+sentiment analysis on is the YouTube video comments and can be found
+on Kaggle [here](
+https://www.kaggle.com/datasets/datasnaek/youtube?select=UScomments.csv).
+
+Download the dataset and copy over to a GCS bucket:
+```sh
+gcloud storage cp /path/to/UScomments.csv gs://YOUR_BUCKET/UScomments.csv
+```
+
+For setting up Kafka, an option is to use [Click to Deploy](
+https://console.cloud.google.com/marketplace/details/click-to-deploy-images/kafka?)
+to quickly launch a Kafka cluster on GCE. See [here](
+../../../README.md#kafka) for more context around using Kafka
+with Dataflow.
+
+A hosted model on Vertex AI is needed before being able to use
+the Vertex AI model handler. One of the current state-of-the-art
+NLP models is HuggingFace's DistilBERT, a distilled version of
+BERT model and is faster at inference. To deploy DistilBERT on
+Vertex AI, run this [notebook](
+https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/model_garden/model_garden_huggingface_pytorch_inference_deployment.ipynb)
in Colab Enterprise.
+
+BigQuery is the pipeline's sink for the inference result output.
+A BigQuery dataset needs to exist first before the pipeline can
+create/write to a table. Run the following command to create
+a BigQuery dataset:
+
+```sh
+bq --location=us-central1 mk \
+ --dataset DATASET_ID
+```
+See also [here](
+https://cloud.google.com/bigquery/docs/datasets) for more details on
+how to create BigQuery datasets
+
+The pipeline first reads the YouTube comments .csv dataset from
+GCS bucket and performs some clean-up before writing it to a Kafka
+topic. The pipeline then reads from that Kafka topic and applies
+various transformation logic before `RunInference` transform performs
+remote inference with the Vertex AI model handler and DistilBERT
+deployed to a Vertex AI endpoint. The inference result is then
+parsed and written to a BigQuery table.
+
+Run the pipeline (replace with appropriate variables in the command
+below):
+
+```sh
+export PROJECT="$(gcloud config get-value project)"
+export TEMP_LOCATION="gs://YOUR-BUCKET/tmp"
+export REGION="us-central1"
+export JOB_NAME="streaming-sentiment-analysis-`date +%Y%m%d-%H%M%S`"
+export NUM_WORKERS="3"
+
+python -m apache_beam.yaml.main \
+ --yaml_pipeline_file
transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml \
+ --runner DataflowRunner \
+ --temp_location $TEMP_LOCATION \
+ --project $PROJECT \
+ --region $REGION \
+ --num_workers $NUM_WORKERS \
+ --job_name $JOB_NAME \
+ --jinja_variables '{ "GCS_PATH": "gs://YOUR-BUCKET/USComments.csv",
+ "BOOTSTRAP_SERVERS": "BOOTSTRAP_IP_ADD:9092",
+ "TOPIC": "YOUR_TOPIC", "USERNAME": "KAFKA_USERNAME", "PASSWORD":
"KAFKA_PASSWORD",
+ "ENDPOINT": "ENDPOINT_ID", "PROJECT": "PROJECT_ID", "LOCATION": "LOCATION",
+ "DATASET": "DATASET_ID", "TABLE": "TABLE_ID" }'
+```
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
new file mode 100644
index 00000000000..63521208daa
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
@@ -0,0 +1,257 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The pipeline first reads the YouTube comments .csv dataset from GCS bucket
+# and performs necessary clean-up before writing it to a Kafka topic.
+# The pipeline then reads from that Kafka topic and applies various
transformation
+# logic before RunInference transform performs remote inference with the
Vertex AI
+# model handler.
+# The inference result is then written to a BigQuery table.
+
+pipeline:
+ transforms:
+ # The YouTube comments dataset contains rows that
+ # have unexpected schema (e.g. rows with more fields,
+ # rows with fields that contain string instead of
+ # integer, etc...). PyTransform helps construct
+ # the logic to properly read in the csv dataset as
+ # a schema'd PCollection.
+ - type: PyTransform
+ name: ReadFromGCS
+ input: {}
+ config:
+ constructor: __callable__
+ kwargs:
+ source: |
+ def ReadYoutubeCommentsCsv(pcoll, file_pattern):
+ def _to_int(x):
+ try:
+ return int(x)
+ except (ValueError):
+ return None
+
+ return (
+ pcoll
+ | beam.io.ReadFromCsv(
+ file_pattern,
+ names=['video_id', 'comment_text', 'likes', 'replies'],
+ on_bad_lines='skip',
+ converters={'likes': _to_int, 'replies': _to_int})
+ | beam.Filter(lambda row:
+ None not in list(row._asdict().values()))
+ | beam.Map(lambda row: beam.Row(
+ video_id=row.video_id,
+ comment_text=row.comment_text,
+ likes=int(row.likes),
+ replies=int(row.replies)))
+ )
+ file_pattern: "{{ GCS_PATH }}"
+
+ # Send the rows as Kafka records to an existing
+ # Kafka topic.
+ - type: WriteToKafka
+ name: SendRecordsToKafka
+ input: ReadFromGCS
+ config:
+ format: "JSON"
+ topic: "{{ TOPIC }}"
+ bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
+ producer_config_updates:
+ sasl.jaas.config:
"org.apache.kafka.common.security.plain.PlainLoginModule required \
+ username={{ USERNAME }} \
+ password={{ PASSWORD }};"
+ security.protocol: "SASL_PLAINTEXT"
+ sasl.mechanism: "PLAIN"
+
+ # Read Kafka records from an existing Kafka topic.
+ - type: ReadFromKafka
+ name: ReadFromMyTopic
+ config:
+ format: "JSON"
+ schema: |
+ {
+ "type": "object",
+ "properties": {
+ "video_id": { "type": "string" },
+ "comment_text": { "type": "string" },
+ "likes": { "type": "integer" },
+ "replies": { "type": "integer" }
+ }
+ }
+ topic: "{{ TOPIC }}"
+ bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
+ auto_offset_reset_config: earliest
+ consumer_config:
+ sasl.jaas.config:
"org.apache.kafka.common.security.plain.PlainLoginModule required \
+ username={{ USERNAME }} \
+ password={{ PASSWORD }};"
+ security.protocol: "SASL_PLAINTEXT"
+ sasl.mechanism: "PLAIN"
+
+ # Remove unexpected characters from the YouTube
+ # comment string, e.g. emojis, ascii characters
+ # outside the common day-to-day English.
+ - type: MapToFields
+ name: RemoveWeirdCharacters
+ input: ReadFromMyTopic
+ config:
+ language: python
+ fields:
+ video_id: video_id
+ comment_text:
+ callable: |
+ import re
+ def filter(row):
+ # regex match and keep letters, digits, whitespace and common
punctuations,
+ # i.e. remove non printable ASCII characters (character codes
not in
+ # the range 32 - 126, or \x20 - \x7E).
+ return re.sub(r'[^\x20-\x7E]', '', row.comment_text).strip()
+ likes: likes
+ replies: replies
+
+ # Remove rows that have empty comment text
+ # after previously removing unexpected characters.
+ - type: Filter
+ name: FilterForProperComments
+ input: RemoveWeirdCharacters
+ config:
+ language: python
+ keep:
+ callable: |
+ def filter(row):
+ return len(row.comment_text) > 0
+
+ # HuggingFace's distilbert-base-uncased is used for inference,
+ # which accepts string with a maximum limit of 250 tokens.
+ # Some of the comment strings can be large and are well over
+ # this limit after tokenization.
+ # This transform truncates the comment string and ensure
+ # every comment satisfy the maximum token limit.
+ - type: MapToFields
+ name: Truncating
+ input: FilterForProperComments
+ config:
+ language: python
+ dependencies:
+ - 'transformers>=4.48.0,<4.49.0'
+ fields:
+ video_id: video_id
+ comment_text:
+ callable: |
+ from transformers import AutoTokenizer
+
+ tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased",
use_fast=True)
+
+ def truncate_sentence(row):
+ tokens = tokenizer.tokenize(row.comment_text)
+ if len(tokens) >= 250:
+ tokens = tokens[:250]
+ truncated_sentence =
tokenizer.convert_tokens_to_string(tokens)
+ else:
+ truncated_sentence = row.comment_text
+
+ return truncated_sentence
+ likes: likes
+ replies: replies
+
+ # HuggingFace's distilbert-base-uncased does not distinguish
+ # between upper and lower case tokens.
+ # This pipeline makes the same point by converting all words
+ # into lowercase.
+ - type: MapToFields
+ name: LowerCase
+ input: Truncating
+ config:
+ language: python
+ fields:
+ video_id: video_id
+ comment_text: "comment_text.lower()"
+ likes: likes
+ replies: replies
+
+ # With VertexAIModelHandlerJSON model handler,
+ # RunInference transform performs remote inferences by
+ # sending POST requests to the Vertex AI endpoint that
+ # our distilbert-base-uncased model is being deployed to.
+ - type: RunInference
+ name: DistilBERTRemoteInference
+ input: LowerCase
+ config:
+ inference_tag: "inference"
+ model_handler:
+ type: "VertexAIModelHandlerJSON"
+ config:
+ endpoint_id: "{{ ENDPOINT }}"
+ project: "{{ PROJECT }}"
+ location: "{{ LOCATION }}"
+ preprocess:
+ callable: 'lambda x: x.comment_text'
+
+ # Parse inference results output
+ - type: MapToFields
+ name: FormatInferenceOutput
+ input: DistilBERTRemoteInference
+ config:
+ language: python
+ fields:
+ video_id:
+ expression: video_id
+ output_type: string
+ comment_text:
+ callable: "lambda x: x.comment_text"
+ output_type: string
+ label:
+ callable: "lambda x: x.inference.inference[0]['label']"
+ output_type: string
+ score:
+ callable: "lambda x: x.inference.inference[0]['score']"
+ output_type: number
+ likes:
+ expression: likes
+ output_type: integer
+ replies:
+ expression: replies
+ output_type: integer
+
+ # Assign windows to each element of the unbounded PCollection.
+ - type: WindowInto
+ name: Windowing
+ input: FormatInferenceOutput
+ config:
+ windowing:
+ type: fixed
+ size: 30s
+
+ # Write all inference results to a BigQuery table.
+ - type: WriteToBigQuery
+ name: WriteInferenceResultsToBQ
+ input: Windowing
+ config:
+ table: "{{ PROJECT }}.{{ DATASET }}.{{ TABLE }}"
+ create_disposition: CREATE_IF_NEEDED
+ write_disposition: WRITE_APPEND
+
+options:
+ yaml_experimental_features: ML
+
+# Expected:
+# Row(video_id='XpVt6Z1Gjjo', comment_text='I AM HAPPY', likes=1, replies=1)
+# Row(video_id='XpVt6Z1Gjjo', comment_text='I AM SAD', likes=1, replies=1)
+# Row(video_id='XpVt6Z1Gjjo', comment_text='§ÁĐ', likes=1, replies=1)
+# Row(video_id='XpVt6Z1Gjjo', comment_text='i am happy', label='POSITIVE',
score=0.95, likes=1, replies=1)
+# Row(video_id='XpVt6Z1Gjjo', comment_text='i am sad', label='NEGATIVE',
score=0.95, likes=1, replies=1)