Repository: beam Updated Branches: refs/heads/master 7019aa70d -> e26bfbe0a
Remove bigshuffle from python examples Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/815bbdcc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/815bbdcc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/815bbdcc Branch: refs/heads/master Commit: 815bbdccc6fd9f0cfcd3b9fb86476b61eb08b293 Parents: 7019aa7 Author: Vikas Kedigehalli <[email protected]> Authored: Wed Apr 19 17:12:52 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Thu Apr 20 08:57:06 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/cookbook/bigshuffle.py | 94 -------------------- .../examples/cookbook/bigshuffle_test.py | 63 ------------- 2 files changed, 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/815bbdcc/sdks/python/apache_beam/examples/cookbook/bigshuffle.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py deleted file mode 100644 index 79cc85c..0000000 --- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py +++ /dev/null @@ -1,94 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""A BigShuffle workflow.""" - -from __future__ import absolute_import - -import argparse -import binascii -import logging - - -import apache_beam as beam -from apache_beam.io import ReadFromText -from apache_beam.io import WriteToText -from apache_beam.utils.pipeline_options import PipelineOptions -from apache_beam.utils.pipeline_options import SetupOptions - - -def crc32line(line): - return binascii.crc32(line) & 0xffffffff - - -def run(argv=None): - # pylint: disable=expression-not-assigned - - parser = argparse.ArgumentParser() - parser.add_argument('--input', - required=True, - help='Input file pattern to process.') - parser.add_argument('--output', - required=True, - help='Output file pattern to write results to.') - parser.add_argument('--checksum_output', - help='Checksum output file pattern.') - known_args, pipeline_args = parser.parse_known_args(argv) - # We use the save_main_session option because one or more DoFn's in this - # workflow rely on global context (e.g., a module imported at module level). - pipeline_options = PipelineOptions(pipeline_args) - pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - # Read the text file[pattern] into a PCollection. - lines = p | ReadFromText(known_args.input, coder=beam.coders.BytesCoder()) - - # Count the occurrences of each word. - output = (lines - | 'split' >> beam.Map( - lambda x: (x[:10], x[10:99])) - .with_output_types(beam.typehints.KV[str, str]) - | 'group' >> beam.GroupByKey() - | 'format' >> beam.FlatMap( - lambda (key, vals): ['%s%s' % (key, val) for val in vals])) - - # Write the output using a "Write" transform that has side effects. - output | WriteToText(known_args.output) - - # Optionally write the input and output checksums. - if known_args.checksum_output: - input_csum = (lines - | 'input-csum' >> beam.Map(crc32line) - | 'combine-input-csum' >> beam.CombineGlobally(sum) - | 'hex-format' >> beam.Map(lambda x: '%x' % x)) - input_csum | 'write-input-csum' >> WriteToText( - known_args.checksum_output + '-input') - - output_csum = (output - | 'output-csum' >> beam.Map(crc32line) - | 'combine-output-csum' >> beam.CombineGlobally(sum) - | 'hex-format-output' >> beam.Map(lambda x: '%x' % x)) - output_csum | 'write-output-csum' >> WriteToText( - known_args.checksum_output + '-output') - - # Actually run the pipeline (all operations above are deferred). - return p.run() - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - run() http://git-wip-us.apache.org/repos/asf/beam/blob/815bbdcc/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py deleted file mode 100644 index 60b6acc..0000000 --- a/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py +++ /dev/null @@ -1,63 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Test for the bigshuffle example.""" - -import logging -import tempfile -import unittest - -from apache_beam.examples.cookbook import bigshuffle - - -class BigShuffleTest(unittest.TestCase): - - SAMPLE_TEXT = 'a b c a b a\naa bb cc aa bb aa' - - def create_temp_file(self, contents): - with tempfile.NamedTemporaryFile(delete=False) as f: - f.write(contents) - return f.name - - def test_basics(self): - temp_path = self.create_temp_file(self.SAMPLE_TEXT) - bigshuffle.run([ - '--input=%s*' % temp_path, - '--output=%s.result' % temp_path, - '--checksum_output=%s.checksum' % temp_path]).wait_until_finish() - # Parse result file and compare. - results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: - for line in result_file: - results.append(line.strip()) - expected = self.SAMPLE_TEXT.split('\n') - self.assertEqual(sorted(results), sorted(expected)) - # Check the checksums - input_csum = '' - with open(temp_path + '.checksum-input-00000-of-00001') as input_csum_file: - input_csum = input_csum_file.read().strip() - output_csum = '' - with open(temp_path + - '.checksum-output-00000-of-00001') as output_csum_file: - output_csum = output_csum_file.read().strip() - expected_csum = 'd629c1f6' - self.assertEqual(input_csum, expected_csum) - self.assertEqual(input_csum, output_csum) - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main()
