AnandInguva commented on code in PR #24382: URL: https://github.com/apache/beam/pull/24382#discussion_r1062584287
########## sdks/python/apache_beam/testing/benchmarks/cloudml/cloudml_benchmark_test.py: ########## @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +import unittest +import uuid + +import pytest + +try: + import apache_beam.testing.benchmarks.cloudml.cloudml_benchmark_constants_lib as lib + from apache_beam.testing.benchmarks.cloudml.pipelines import workflow + from apache_beam.testing.test_pipeline import TestPipeline +except ImportError: # pylint: disable=bare-except + raise unittest.SkipTest('Dependencies are not installed') + +_INPUT_GCS_BUCKET_ROOT = 'gs://apache-beam-ml/datasets/cloudml/criteo' +_CRITEO_FEATURES_FILE = 'testdata/criteo/expected/features.tfrecord.gz' +_OUTPUT_GCS_BUKCET_ROOT = 'gs://temp-storage-for-end-to-end-tests/tft/' Review Comment: Thanks for catching ########## sdks/python/apache_beam/testing/benchmarks/cloudml/cloudml_benchmark_constants_lib.py: ########## @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A common file for CloudML benchmarks. + +This file contains constants for pipeline paths, dependency locations and +test data paths. +""" + +INPUT_CRITEO_SMALL = 'train10.tsv' +INPUT_CRITEO_SMALL_100MB = '100mb/train.txt' +INPUT_CRITEO_10GB = '10gb/train.txt' + +# The model is trained by running the criteo preprocessing and training. +# The input dataset was the Criteo 10GB dataset and frequency_threshold=100 was +# set for categorical features. +# MODEL_CRITEO_10GB = 'testdata/criteo/saved_model' Review Comment: As of now, no. These are used by TFMA tests. ########## sdks/python/apache_beam/testing/benchmarks/cloudml/pipelines/workflow.py: ########## @@ -0,0 +1,223 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import argparse +import logging +import os + +import apache_beam as beam +import tensorflow_transform as tft +import tensorflow_transform.beam as tft_beam +from apache_beam.testing.benchmarks.cloudml.criteo_tft import criteo +from tensorflow_transform import coders +from tensorflow_transform.tf_metadata import dataset_metadata +from tensorflow_transform.tf_metadata import schema_utils +from tfx_bsl.public import tfxio + +# Name of the column for the synthetic version of the benchmark. +_SYNTHETIC_COLUMN = 'x' + + +class _RecordBatchToPyDict(beam.PTransform): + """Converts PCollections of pa.RecordBatch to python dicts.""" + def __init__(self, input_feature_spec): + self._input_feature_spec = input_feature_spec + + def expand(self, pcoll): + def format_values(instance): + return { + k: v.squeeze(0).tolist() + if v is not None else self._input_feature_spec[k].default_value + for k, + v in instance.items() + } + + return ( + pcoll + | 'RecordBatchToDicts' >> + beam.FlatMap(lambda x: x.to_pandas().to_dict(orient='records')) + | 'FormatPyDictValues' >> beam.Map(format_values)) + + +def _synthetic_preprocessing_fn(inputs): + return { + _SYNTHETIC_COLUMN: tft.compute_and_apply_vocabulary( + inputs[_SYNTHETIC_COLUMN], + + # Execute more codepaths but do no frequency filtration. + frequency_threshold=1, + + # Execute more codepaths but do no top filtration. + top_k=2**31 - 1, + + # Execute more codepaths + num_oov_buckets=10) + } + + +class _PredictionHistogramFn(beam.DoFn): + def __init__(self): + # Beam Metrics API for Distributions only works with integers but + # predictions are floating point numbers. We thus store a "quantized" + # distribution of the prediction with sufficient granularity and for ease + # of human interpretation (eg as a percentage for logistic regression). + self._prediction_distribution = beam.metrics.Metrics.distribution( + self.__class__, 'int(scores[0]*100)') + + def process(self, element): + self._prediction_distribution.update(int(element['scores'][0] * 100)) + + +def setup_pipeline(p, args): + if args.classifier == 'criteo': + input_feature_spec = criteo.make_input_feature_spec() + input_schema = schema_utils.schema_from_feature_spec(input_feature_spec) + input_tfxio = tfxio.BeamRecordCsvTFXIO( + physical_format='text', + column_names=criteo.make_ordered_column_names(), + schema=input_schema, + delimiter=criteo.DEFAULT_DELIMITER, + telemetry_descriptors=['CriteoCloudMLBenchmark']) + preprocessing_fn = criteo.make_preprocessing_fn(args.frequency_threshold) + else: + assert False, 'Unknown args classifier <{}>'.format(args.classifier) + + input_data = p | 'ReadFromText' >> beam.io.textio.ReadFromText( + args.input, coder=beam.coders.BytesCoder()) + + if args.benchmark_type == 'tft': + logging.info('TFT benchmark') + + # Setting TFXIO output format only for Criteo benchmarks to make sure that + # both codepaths are covered. + output_record_batches = args.classifier == 'criteo' + + # pylint: disable=expression-not-assigned + input_metadata = dataset_metadata.DatasetMetadata(schema=input_schema) + ( + input_metadata + | 'WriteInputMetadata' >> tft_beam.WriteMetadata( + os.path.join(args.output, 'raw_metadata'), pipeline=p)) + + with tft_beam.Context(temp_dir=os.path.join(args.output, 'tmp'), + use_deep_copy_optimization=True): + decoded_input_data = ( + input_data | 'DecodeForAnalyze' >> input_tfxio.BeamSource()) + transform_fn = ((decoded_input_data, input_tfxio.TensorAdapterConfig()) + | 'Analyze' >> tft_beam.AnalyzeDataset(preprocessing_fn)) + + if args.shuffle: + # Shuffle the data before any decoding (more compact representation). + input_data |= 'Shuffle' >> beam.transforms.Reshuffle() # pylint: disable=no-value-for-parameter + + decoded_input_data = ( + input_data | 'DecodeForTransform' >> input_tfxio.BeamSource()) + (dataset, + metadata) = ((decoded_input_data, input_tfxio.TensorAdapterConfig()), + transform_fn) | 'Transform' >> tft_beam.TransformDataset( + output_record_batches=output_record_batches) + + if output_record_batches: + + def record_batch_to_examples(batch, unary_passthrough_features): + """Encodes transformed data as tf.Examples.""" + # Ignore unary pass-through features. + del unary_passthrough_features + # From beam: "imports, functions and other variables defined in the + # global context of your __main__ file of your Dataflow pipeline are, by + # default, not available in the worker execution environment, and such + # references will cause a NameError, unless the --save_main_session + # pipeline option is set to True. Please see + # https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors ." + from tfx_bsl.coders.example_coder import RecordBatchToExamples + return RecordBatchToExamples(batch) + + encode_ptransform = beam.FlatMapTuple(record_batch_to_examples) + else: + example_coder = coders.ExampleProtoCoder(metadata.schema) + encode_ptransform = beam.Map(example_coder.encode) + + # TODO: Use WriteDataset instead when it becomes available. + ( + dataset + | 'Encode' >> encode_ptransform + | 'Write' >> beam.io.WriteToTFRecord( + os.path.join(args.output, 'features_train'), + file_name_suffix='.tfrecord.gz')) + # transform_fn | beam.Map(print) + transform_fn | 'WriteTransformFn' >> tft_beam.WriteTransformFn(args.output) + + # TODO: Remember to eventually also save the statistics. + else: + logging.fatal('Unknown benchmark type: %s', args.benchmark_type) + + +def parse_known_args(argv): + """Parses args for this workflow.""" + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', + dest='input', + required=True, + help='Input path for input files.') + parser.add_argument( + '--output', + dest='output', + required=True, + help='Output path for output files.') + parser.add_argument( Review Comment: No, I will remove it ########## .test-infra/jenkins/job_BenchmarkTests_CloudML_Python.groovy: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties +import PhraseTriggeringPostCommitBuilder +import CronJobBuilder + +def cloudMLJob = { scope -> + scope.description('Runs the TFT Criteo Examples on the Dataflow runner.') + + // Set common parameters. + commonJobProperties.setTopLevelMainJobProperties(scope) + + // Gradle goals for this job. + scope.steps { + gradle { + rootBuildScriptDir(commonJobProperties.checkoutDir) + commonJobProperties.setGradleSwitches(delegate, 'master', 360) + tasks(':sdks:python:test-suites:dataflow:tftTests') + } + } +} Review Comment: I am not sure. These run for 4 hours. I thought daily once as a cron job seems good enough but I can add a trigger command. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
