[
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=199474&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-199474
]
ASF GitHub Bot logged work on BEAM-3342:
----------------------------------------
Author: ASF GitHub Bot
Created on: 15/Feb/19 22:16
Start Date: 15/Feb/19 22:16
Worklog Time Spent: 10m
Work Description: chamikaramj commented on pull request #7737:
[BEAM-3342] Create a Cloud Bigtable Python connector Read
URL: https://github.com/apache/beam/pull/7737#discussion_r257076956
##########
File path: sdks/python/apache_beam/io/gcp/bigtableio.py
##########
@@ -141,3 +147,197 @@ def expand(self, pvalue):
| beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
beam_options['instance_id'],
beam_options['table_id'])))
+
+
+class _BigTableReadFn(iobase.BoundedSource):
+ """ Creates the connector to get the rows in Bigtable and using each
+ row in beam pipe line
+ Args:
+ project_id(str): GCP Project ID
+ instance_id(str): GCP Instance ID
+ table_id(str): GCP Table ID
+ row_set(RowSet): Creating a set of row keys and row ranges.
+ filter_(RowFilter): Filter to apply to cells in a row.
+
+ """
+ def __init__(self, project_id, instance_id, table_id,
+ row_set=None, filter_=None):
+ """ Constructor of the Read connector of Bigtable
+ Args:
+ project_id(str): GCP Project of to write the Rows
+ instance_id(str): GCP Instance to write the Rows
+ table_id(str): GCP Table to write the `DirectRows`
+ """
+ super(self.__class__, self).__init__()
+ self.beam_options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'row_set': row_set,
+ 'filter_': filter_}
+ self.table = None
+ self.read_row = Metrics.counter(self.__class__.__name__, 'read_row')
+
+ def __getstate__(self):
+ return self.beam_options
+
+ def __setstate__(self, options):
+ self.beam_options = {'project_id': options['project_id'],
+ 'instance_id': options['instance_id'],
+ 'table_id': options['table_id'],
+ 'row_set': options['row_set'],
+ 'filter_': options['filter_']}
+
+ def _getTable(self):
+ if self.table is None:
+ client = Client(project=self.beam_options['project_id'])
+ instance = client.instance(self.beam_options['instance_id'])
+ self.table = instance.table(self.beam_options['table_id'])
+ return self.table
+
+ def estimate_size(self):
+ size = [k.offset_bytes for k in self._getTable().sample_row_keys()][-1]
Review comment:
This (sample size) should be multiplied by a multiplier to get the estimated
size for the source ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 199474)
Time Spent: 16.5h (was: 16h 20m)
> Create a Cloud Bigtable Python connector
> ----------------------------------------
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Solomon Duskis
> Assignee: Solomon Duskis
> Priority: Major
> Labels: triaged
> Time Spent: 16.5h
> Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)