This is an automated email from the ASF dual-hosted git repository. chamikara pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 59499f6 [BEAM-3342] Create a Cloud Bigtable Python connector Write new 054d42c Merge pull request #7367: [BEAM-3342] Create a Cloud Bigtable Python connector Write 59499f6 is described below commit 59499f682204f51e1a5654973ac9e6084ab6b100 Author: Juan Rael <j...@qlogic.io> AuthorDate: Tue Jan 29 10:16:44 2019 -0500 [BEAM-3342] Create a Cloud Bigtable Python connector Write --- .../jenkins/dependency_check/generate_report.sh | 2 + .../job_beam_PerformanceTests_Analysis.groovy | 2 +- ownership/PYTHON_DEPENDENCY_OWNERS.yaml | 6 + .../examples/cookbook/bigtableio_it_test.py | 194 +++++++++++++++++++++ sdks/python/apache_beam/io/gcp/bigtableio.py | 143 +++++++++++++++ sdks/python/container/base_image_requirements.txt | 2 + sdks/python/setup.py | 2 + sdks/python/tox.ini | 5 +- 8 files changed, 354 insertions(+), 2 deletions(-) diff --git a/.test-infra/jenkins/dependency_check/generate_report.sh b/.test-infra/jenkins/dependency_check/generate_report.sh index 13c6a0f..734aa44 100755 --- a/.test-infra/jenkins/dependency_check/generate_report.sh +++ b/.test-infra/jenkins/dependency_check/generate_report.sh @@ -42,6 +42,8 @@ REPORT_DESCRIPTION=" /usr/bin/virtualenv dependency/check . dependency/check/bin/activate pip install --upgrade google-cloud-bigquery +pip install --upgrade google-cloud-bigtable +pip install --upgrade google-cloud-core rm -f build/dependencyUpdates/beam-dependency-check-report.txt # Insall packages and run the unit tests of the report generator and the jira manager diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_Analysis.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_Analysis.groovy index 59c347d..efa8353 100644 --- a/.test-infra/jenkins/job_beam_PerformanceTests_Analysis.groovy +++ b/.test-infra/jenkins/job_beam_PerformanceTests_Analysis.groovy @@ -73,7 +73,7 @@ job(testConfiguration.jobName) { shell('.env/bin/pip install --upgrade setuptools pip') // Install job requirements for analysis script. - shell('.env/bin/pip install requests google.cloud.bigquery mock') + shell('.env/bin/pip install requests google.cloud.bigquery mock google.cloud.bigtable google.cloud') // Launch verification tests before executing script. shell('.env/bin/python ' + commonJobProperties.checkoutDir + '/.test-infra/jenkins/verify_performance_test_results_test.py') diff --git a/ownership/PYTHON_DEPENDENCY_OWNERS.yaml b/ownership/PYTHON_DEPENDENCY_OWNERS.yaml index 7e46c6e..ba81490 100644 --- a/ownership/PYTHON_DEPENDENCY_OWNERS.yaml +++ b/ownership/PYTHON_DEPENDENCY_OWNERS.yaml @@ -42,9 +42,15 @@ deps: google-apitools: owners: + google-cloud-core: + owners: + google-cloud-bigquery: owners: markflyhigh + google-cloud-bigtable: + owners: + google-cloud-pubsub: owners: markflyhigh diff --git a/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py new file mode 100644 index 0000000..82e2869 --- /dev/null +++ b/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py @@ -0,0 +1,194 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unittest for GCP Bigtable testing.""" +from __future__ import absolute_import + +import datetime +import logging +import random +import string +import unittest +import uuid + +import pytz + +import apache_beam as beam +from apache_beam.io.gcp.bigtableio import WriteToBigTable +from apache_beam.metrics.metric import MetricsFilter +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.runners.runner import PipelineState +from apache_beam.testing.test_pipeline import TestPipeline + +# Protect against environments where bigtable library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud._helpers import _datetime_from_microseconds + from google.cloud._helpers import _microseconds_from_datetime + from google.cloud._helpers import UTC + from google.cloud.bigtable import row, column_family, Client +except ImportError: + Client = None + UTC = pytz.utc + _microseconds_from_datetime = lambda label_stamp: label_stamp + _datetime_from_microseconds = lambda micro: micro + + +EXISTING_INSTANCES = [] +LABEL_KEY = u'python-bigtable-beam' +label_stamp = datetime.datetime.utcnow().replace(tzinfo=UTC) +label_stamp_micros = _microseconds_from_datetime(label_stamp) +LABELS = {LABEL_KEY: str(label_stamp_micros)} + + +class GenerateTestRows(beam.PTransform): + """ A transform test to run write to the Bigtable Table. + + A PTransform that generate a list of `DirectRow` to write it in + Bigtable Table. + + """ + def __init__(self, number, project_id=None, instance_id=None, + table_id=None): + super(WriteToBigTable, self).__init__() + self.number = number + self.rand = random.choice(string.ascii_letters + string.digits) + self.column_family_id = 'cf1' + self.beam_options = {'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id} + + def _generate(self): + value = ''.join(self.rand for i in range(100)) + + for index in range(self.number): + key = "beam_key%s" % ('{0:07}'.format(index)) + direct_row = row.DirectRow(row_key=key) + for column_id in range(10): + direct_row.set_cell(self.column_family_id, + ('field%s' % column_id).encode('utf-8'), + value, + datetime.datetime.now()) + yield direct_row + + def expand(self, pvalue): + beam_options = self.beam_options + return (pvalue + | beam.Create(self._generate()) + | WriteToBigTable(beam_options['project_id'], + beam_options['instance_id'], + beam_options['table_id'])) + + +@unittest.skipIf(Client is None, 'GCP Bigtable dependencies are not installed') +class BigtableIOWriteTest(unittest.TestCase): + """ Bigtable Write Connector Test + + """ + DEFAULT_TABLE_PREFIX = "python-test" + instance_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8] + cluster_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8] + table_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8] + number = 500 + LOCATION_ID = "us-east1-b" + + def setUp(self): + try: + from google.cloud.bigtable import enums + self.STORAGE_TYPE = enums.StorageType.HDD + self.INSTANCE_TYPE = enums.Instance.Type.DEVELOPMENT + except ImportError: + self.STORAGE_TYPE = 2 + self.INSTANCE_TYPE = 2 + + self.test_pipeline = TestPipeline(is_integration_test=True) + self.runner_name = type(self.test_pipeline.runner).__name__ + self.project = self.test_pipeline.get_option('project') + self.client = Client(project=self.project, admin=True) + + self._delete_old_instances() + + self.instance = self.client.instance(self.instance_id, + instance_type=self.INSTANCE_TYPE, + labels=LABELS) + + if not self.instance.exists(): + cluster = self.instance.cluster(self.cluster_id, + self.LOCATION_ID, + default_storage_type=self.STORAGE_TYPE) + self.instance.create(clusters=[cluster]) + self.table = self.instance.table(self.table_id) + + if not self.table.exists(): + max_versions_rule = column_family.MaxVersionsGCRule(2) + column_family_id = 'cf1' + column_families = {column_family_id: max_versions_rule} + self.table.create(column_families=column_families) + + def _delete_old_instances(self): + instances = self.client.list_instances() + EXISTING_INSTANCES[:] = instances + + def age_in_hours(micros): + return (datetime.datetime.utcnow().replace(tzinfo=UTC) - ( + _datetime_from_microseconds(micros))).total_seconds() // 3600 + CLEAN_INSTANCE = [i for instance in EXISTING_INSTANCES for i in instance if( + LABEL_KEY in i.labels.keys() and + (age_in_hours(int(i.labels[LABEL_KEY])) >= 2))] + + if CLEAN_INSTANCE: + for instance in CLEAN_INSTANCE: + instance.delete() + + def tearDown(self): + if self.instance.exists(): + self.instance.delete() + + def test_bigtable_write(self): + number = self.number + pipeline_args = self.test_pipeline.options_list + pipeline_options = PipelineOptions(pipeline_args) + + with beam.Pipeline(options=pipeline_options) as pipeline: + config_data = {'project_id':self.project, + 'instance_id':self.instance, + 'table_id':self.table} + _ = ( + pipeline + | 'Generate Direct Rows' >> GenerateTestRows(number, **config_data)) + + result = pipeline.run() + result.wait_until_finish() + + assert result.state == PipelineState.DONE + + read_rows = self.table.read_rows() + assert len([_ for _ in read_rows]) == number + + if not hasattr(result, 'has_job') or result.has_job: + read_filter = MetricsFilter().with_name('Written Row') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + + logging.info('Number of Rows: %d', read_counter.committed) + assert read_counter.committed == number + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py new file mode 100644 index 0000000..ccb10c5 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/bigtableio.py @@ -0,0 +1,143 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""BigTable connector + +This module implements writing to BigTable tables. +The default mode is to set row data to write to BigTable tables. +The syntax supported is described here: +https://cloud.google.com/bigtable/docs/quickstart-cbt + +BigTable connector can be used as main outputs. A main output +(common case) is expected to be massive and will be split into +manageable chunks and processed in parallel. In the example below +we created a list of rows then passed to the GeneratedDirectRows +DoFn to set the Cells and then we call the BigTableWriteFn to insert +those generated rows in the table. + + main_table = (p + | beam.Create(self._generate()) + | WriteToBigTable(project_id, + instance_id, + table_id)) +""" +from __future__ import absolute_import + +import apache_beam as beam +from apache_beam.metrics import Metrics +from apache_beam.transforms.display import DisplayDataItem + +try: + from google.cloud.bigtable import Client +except ImportError: + pass + +__all__ = ['WriteToBigTable'] + + +class _BigTableWriteFn(beam.DoFn): + """ Creates the connector can call and add_row to the batcher using each + row in beam pipe line + Args: + project_id(str): GCP Project ID + instance_id(str): GCP Instance ID + table_id(str): GCP Table ID + + """ + + def __init__(self, project_id, instance_id, table_id): + """ Constructor of the Write connector of Bigtable + Args: + project_id(str): GCP Project of to write the Rows + instance_id(str): GCP Instance to write the Rows + table_id(str): GCP Table to write the `DirectRows` + """ + super(_BigTableWriteFn, self).__init__() + self.beam_options = {'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id} + self.table = None + self.batcher = None + self.written = Metrics.counter(self.__class__, 'Written Row') + + def __getstate__(self): + return self.beam_options + + def __setstate__(self, options): + self.beam_options = options + self.table = None + self.batcher = None + self.written = Metrics.counter(self.__class__, 'Written Row') + + def start_bundle(self): + if self.table is None: + client = Client(project=self.beam_options['project_id']) + instance = client.instance(self.beam_options['instance_id']) + self.table = instance.table(self.beam_options['table_id']) + self.batcher = self.table.mutations_batcher() + + def process(self, row): + self.written.inc() + # You need to set the timestamp in the cells in this row object, + # when we do a retry we will mutating the same object, but, with this + # we are going to set our cell with new values. + # Example: + # direct_row.set_cell('cf1', + # 'field1', + # 'value1', + # timestamp=datetime.datetime.now()) + self.batcher.mutate(row) + + def finish_bundle(self): + self.batcher.flush() + self.batcher = None + + def display_data(self): + return {'projectId': DisplayDataItem(self.beam_options['project_id'], + label='Bigtable Project Id'), + 'instanceId': DisplayDataItem(self.beam_options['instance_id'], + label='Bigtable Instance Id'), + 'tableId': DisplayDataItem(self.beam_options['table_id'], + label='Bigtable Table Id') + } + + +class WriteToBigTable(beam.PTransform): + """ A transform to write to the Bigtable Table. + + A PTransform that write a list of `DirectRow` into the Bigtable Table + + """ + def __init__(self, project_id=None, instance_id=None, + table_id=None): + """ The PTransform to access the Bigtable Write connector + Args: + project_id(str): GCP Project of to write the Rows + instance_id(str): GCP Instance to write the Rows + table_id(str): GCP Table to write the `DirectRows` + """ + super(WriteToBigTable, self).__init__() + self.beam_options = {'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id} + + def expand(self, pvalue): + beam_options = self.beam_options + return (pvalue + | beam.ParDo(_BigTableWriteFn(beam_options['project_id'], + beam_options['instance_id'], + beam_options['table_id']))) diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt index 05ed780..1f8c41b 100644 --- a/sdks/python/container/base_image_requirements.txt +++ b/sdks/python/container/base_image_requirements.txt @@ -50,6 +50,8 @@ googledatastore==7.0.1 google-cloud-pubsub==0.39.0 google-cloud-bigquery==1.6.0 proto-google-cloud-datastore-v1==0.90.4 +google-cloud-bigtable==0.31.1 +google-cloud-core==0.28.1 # Optional packages cython==0.28.1 diff --git a/sdks/python/setup.py b/sdks/python/setup.py index d2f592f..7078646 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -142,6 +142,8 @@ GCP_REQUIREMENTS = [ 'google-cloud-pubsub==0.39.0', # GCP packages required by tests 'google-cloud-bigquery>=1.6.0,<1.7.0', + 'google-cloud-core==0.28.1', + 'google-cloud-bigtable==0.31.1', ] diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index e46bcde..040b71f 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -58,13 +58,16 @@ setenv = BEAM_EXPERIMENTAL_PY3=1 RUN_SKIPPED_PY3_TESTS=0 modules = - apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp. [...] + apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp. [...] commands = python --version pip --version {toxinidir}/scripts/run_tox_cleanup.sh python setup.py nosetests --tests {[testenv:py3]modules} {toxinidir}/scripts/run_tox_cleanup.sh +deps = + google-cloud-core==0.28.1 + google-cloud-bigtable==0.31.1 [testenv:py27-cython] # cython tests are only expected to work in linux (2.x and 3.x)