damccorm commented on code in PR #27620: URL: https://github.com/apache/beam/pull/27620#discussion_r1283322831
########## sdks/python/apache_beam/examples/ml_transform/ml_transform_it_test.py: ########## @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import os +import time +import unittest +import uuid + +import pytest + +try: + from apache_beam.examples.ml_transform import vocab_tfidf_processing + from apache_beam.testing.load_tests.load_test_metrics_utils import InfluxDBMetricsPublisherOptions + from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsReader + from apache_beam.testing.test_pipeline import TestPipeline +except ImportError: # pylint: disable=bare-except + raise unittest.SkipTest('tensorflow_transform is not installed.') + +_OUTPUT_GCS_BUCKET_ROOT = 'gs://temp-storage-for-end-to-end-tests/tft/' + + +def _publish_metrics(pipeline, metric_value, metrics_table, metric_name): + influx_options = InfluxDBMetricsPublisherOptions( + metrics_table, + pipeline.get_option('influx_db_name'), + pipeline.get_option('influx_hostname'), + os.getenv('INFLUXDB_USER'), + os.getenv('INFLUXDB_USER_PASSWORD'), + ) + metric_reader = MetricsReader( + project_name=pipeline.get_option('project'), + bq_table=metrics_table, + bq_dataset=pipeline.get_option('metrics_dataset'), + publish_to_bq=True, + influxdb_options=influx_options, + ) + metric_reader.publish_values([( + metric_name, + metric_value, + )]) + + [email protected]_tft +class LargeMovieReviewDatasetProcessTest(unittest.TestCase): + def test_process_large_movie_review_dataset(self): + input_data_dir = 'gs://apache-beam-ml/datasets/aclImdb' + artifact_location = os.path.join(_OUTPUT_GCS_BUCKET_ROOT, uuid.uuid4().hex) + output_dir = os.path.join(_OUTPUT_GCS_BUCKET_ROOT, uuid.uuid4().hex) + extra_opts = { + 'input_data_dir': input_data_dir, + 'output_dir': output_dir, + 'artifact_location': artifact_location, + } + + extra_opts['job_name'] = 'mltransform-large-movie-review-dataset-{}'.format( + uuid.uuid4().hex) + + test_pipeline = TestPipeline(is_integration_test=True) + start_time = time.time() + vocab_tfidf_processing.run( + test_pipeline.get_full_options_as_args( + **extra_opts, save_main_session=False), + ) + end_time = time.time() Review Comment: Should we be doing any validation here? e.g. the output file was uploaded, the artifact was uploaded, the results are correct? ########## sdks/python/apache_beam/examples/ml_transform/vocab_tfidf_processing.py: ########## @@ -0,0 +1,190 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This example uses Large Movie Review Dataset: http://ai.stanford.edu/~amaas/data/sentiment/ # pylint:disable=line-too-long +to preprocess the input text data and generate TF-IDF scores for each word of +the input text. + +Workflow: +1. The input text is split into words using a delimiter. +2. The words are then converted to a vocabulary index using + ComputeAndApplyVocabulary. +3. The vocabulary index is then converted to TF-IDF scores using TFIDF. +4. The output of the pipeline is a Tuple of + (input_text, [(vocab_index, tfidf_score)]. + +To run this pipeline, download the Large Movie Review Dataset and +place it in a directory. Pass the directory path to --input_data_dir. +The pipeline will read the data from the directory and write the +transformed data to --output_dir. To save the artifacts, such as the +vocabulary file generated by ComputeAndApplyVocabulary will be saved to +the --artifact_location. + +In this pipeline, we only preprocess the train data(25000 samples). +""" + +import argparse +import logging +import os + +import apache_beam as beam +from apache_beam.ml.transforms.base import MLTransform +from apache_beam.ml.transforms.tft import TFIDF +from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary + +RAW_DATA_KEY = 'raw_data' +REVIEW_COLUMN = 'review' +LABEL_COLUMN = 'label' +DELIMITERS = '.,!?() ' +VOCAB_SIZE = 20000 + + +# pylint: disable=invalid-name [email protected]_fn +def Shuffle(pcoll): + """Shuffles a PCollection. Collection should not contain duplicates.""" + return ( + pcoll + | 'PairWithHash' >> beam.Map(lambda x: (hash(x), x)) + | 'GroupByHash' >> beam.GroupByKey() + | 'DropHash' >> beam.FlatMap(lambda hash_and_values: hash_and_values[1])) + + +class ReadAndShuffleData(beam.PTransform): + def __init__(self, pos_file_pattern, neg_file_pattern): + self.pos_file_pattern = pos_file_pattern + self.neg_file_pattern = neg_file_pattern + + def expand(self, pcoll): + + negative_examples = ( + pcoll + | "ReadNegativeExample" >> beam.io.ReadFromText(self.neg_file_pattern) + | 'PairWithZero' >> beam.Map(lambda review: (review, 0))) + + positive_examples = ( + pcoll + | "ReadPositiveExample" >> beam.io.ReadFromText(self.pos_file_pattern) + | 'PairWithOne' >> beam.Map(lambda review: (review, 1))) + + all_examples = ((negative_examples, positive_examples) + | 'FlattenPColls' >> beam.Flatten()) + + shuffled_examples = ( + all_examples + | 'Distinct' >> beam.Distinct() Review Comment: Nit: should we push the distinct to each of the branches (negative_examples, positive_examples)? I'd expect it to be more efficient that way since the set of previously seen examples will be smaller for the average comparison, and flatten will need to do less work -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
