[
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=199477&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-199477
]
ASF GitHub Bot logged work on BEAM-3342:
----------------------------------------
Author: ASF GitHub Bot
Created on: 15/Feb/19 22:16
Start Date: 15/Feb/19 22:16
Worklog Time Spent: 10m
Work Description: chamikaramj commented on pull request #7737:
[BEAM-3342] Create a Cloud Bigtable Python connector Read
URL: https://github.com/apache/beam/pull/7737#discussion_r257403826
##########
File path: sdks/python/apache_beam/io/gcp/bigtableio.py
##########
@@ -141,3 +147,197 @@ def expand(self, pvalue):
| beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
beam_options['instance_id'],
beam_options['table_id'])))
+
+
+class _BigTableReadFn(iobase.BoundedSource):
+ """ Creates the connector to get the rows in Bigtable and using each
+ row in beam pipe line
+ Args:
+ project_id(str): GCP Project ID
+ instance_id(str): GCP Instance ID
+ table_id(str): GCP Table ID
+ row_set(RowSet): Creating a set of row keys and row ranges.
+ filter_(RowFilter): Filter to apply to cells in a row.
+
+ """
+ def __init__(self, project_id, instance_id, table_id,
+ row_set=None, filter_=None):
+ """ Constructor of the Read connector of Bigtable
+ Args:
+ project_id(str): GCP Project of to write the Rows
+ instance_id(str): GCP Instance to write the Rows
+ table_id(str): GCP Table to write the `DirectRows`
+ """
+ super(self.__class__, self).__init__()
+ self.beam_options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'row_set': row_set,
+ 'filter_': filter_}
+ self.table = None
+ self.read_row = Metrics.counter(self.__class__.__name__, 'read_row')
+
+ def __getstate__(self):
+ return self.beam_options
+
+ def __setstate__(self, options):
+ self.beam_options = {'project_id': options['project_id'],
+ 'instance_id': options['instance_id'],
+ 'table_id': options['table_id'],
+ 'row_set': options['row_set'],
+ 'filter_': options['filter_']}
+
+ def _getTable(self):
+ if self.table is None:
+ client = Client(project=self.beam_options['project_id'])
+ instance = client.instance(self.beam_options['instance_id'])
+ self.table = instance.table(self.beam_options['table_id'])
+ return self.table
+
+ def estimate_size(self):
+ size = [k.offset_bytes for k in self._getTable().sample_row_keys()][-1]
+ return size
+
+ def get_sample_row_keys(self):
+ return self._getTable().sample_row_keys()
+
+ def get_range_tracker(self, start_position, stop_position):
+ return LexicographicKeyRangeTracker(start_position, stop_position)
+
+ def split(self,
+ desired_bundle_size,
+ start_position=None,
+ stop_position=None):
+
+ if self.beam_options['row_set'] is not None:
+ for sample_row_key in self.beam_options['row_set'].row_ranges:
+ sample_row_keys = self.get_sample_row_keys()
+ for row_split in self.split_range_size(desired_bundle_size,
+ sample_row_keys,
+ sample_row_key):
+ yield row_split
+ else:
+ suma = 0
+ last_offset = 0
+ current_size = 0
+
+ start_key = b''
+ end_key = b''
+
+ sample_row_keys = self.get_sample_row_keys()
+ for sample_row_key in sample_row_keys:
+ current_size = sample_row_key.offset_bytes-last_offset
+ if suma >= desired_bundle_size:
+ end_key = sample_row_key.row_key
+ for fraction in self.range_split_fraction(suma,
+ desired_bundle_size,
+ start_key, end_key):
+ yield fraction
+ start_key = sample_row_key.row_key
+
+ suma = 0
+ suma += current_size
+ last_offset = sample_row_key.offset_bytes
+
+ def split_range_size(self, desired_size, sample_row_keys, range_):
+ start, end = None, None
+ l = 0
+ for sample_row in sample_row_keys:
Review comment:
Please add unit tests to reach 100% code coverage for the source (specially
key parts like splitting and size estimation).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 199477)
> Create a Cloud Bigtable Python connector
> ----------------------------------------
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Solomon Duskis
> Assignee: Solomon Duskis
> Priority: Major
> Labels: triaged
> Time Spent: 16h 50m
> Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)