chamikaramj commented on code in PR #26593:
URL: https://github.com/apache/beam/pull/26593#discussion_r1233149294
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java:
##########
@@ -100,60 +100,55 @@ public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_TAG);
}
- /** Configuration for reading from BigTable. */
+ /** Configuration for reading from Bigtable. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
- public abstract static class BigTableReadSchemaTransformConfiguration {
- /** Instantiates a {@link
BigTableReadSchemaTransformConfiguration.Builder} instance. */
+ public abstract static class BigtableReadSchemaTransformConfiguration {
+ /** Instantiates a {@link
BigtableReadSchemaTransformConfiguration.Builder} instance. */
+ public void validate() {
+ String emptyStringMessage =
+ "Invalid Bigtable Read configuration: %s should not be a non-empty
String";
+ checkArgument(!this.getTableId().isEmpty(),
String.format(emptyStringMessage, "table"));
+ checkArgument(!this.getInstanceId().isEmpty(),
String.format(emptyStringMessage, "instance"));
+ checkArgument(!this.getProjectId().isEmpty(),
String.format(emptyStringMessage, "project"));
+ }
+
public static Builder builder() {
- return new
AutoValue_BigTableReadSchemaTransformProvider_BigTableReadSchemaTransformConfiguration
+ return new
AutoValue_BigtableReadSchemaTransformProvider_BigtableReadSchemaTransformConfiguration
.Builder();
}
- public abstract String getTable();
+ public abstract String getTableId();
- public abstract String getInstance();
+ public abstract String getInstanceId();
- public abstract String getProject();
+ public abstract String getProjectId();
- /** Builder for the {@link BigTableReadSchemaTransformConfiguration}. */
+ /** Builder for the {@link BigtableReadSchemaTransformConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
- public abstract Builder setTable(String table);
-
- public abstract Builder setInstance(String instance);
-
- public abstract Builder setProject(String project);
-
- abstract BigTableReadSchemaTransformConfiguration autoBuild();
+ public abstract Builder setTableId(String table);
Review Comment:
Nit: change variable names to match the setters (tableId, instanceId,
projectId).
##########
sdks/python/apache_beam/io/gcp/bigtableio.py:
##########
@@ -227,3 +231,75 @@ def expand(self, pvalue):
beam_options['project_id'],
beam_options['instance_id'],
beam_options['table_id'])))
+
+
+class ReadFromBigtable(PTransform):
+ """Reads rows from Bigtable.
+
+ Returns a PCollection of PartialRowData objects, each representing a
+ Bigtable row. For more information about this row object, visit
+
https://cloud.google.com/python/docs/reference/bigtable/latest/row#class-googlecloudbigtablerowpartialrowdatarowkey
+ """
+ URN = "beam:schematransform:org.apache.beam:bigtable_read:v1"
+
+ def __init__(self, table_id, instance_id, project_id,
expansion_service=None):
+ """Initialize a ReadFromBigtable transform.
+
+ :param table_id:
+ The ID of the table to read from.
+ :param instance_id:
+ The ID of the instance where the table resides.
+ :param project_id:
+ The GCP project ID.
+ :param expansion_service:
+ The address of the expansion service. If no expansion service is
+ provided, will attempt to run the default GCP expansion service.
+ """
+ super().__init__()
+ self._table_id = table_id
+ self._instance_id = instance_id
+ self._project_id = project_id
+ self._expansion_service = (
+ expansion_service or BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build'))
+ self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+ self._expansion_service, self.URN)
+
+ def expand(self, input):
+ external_read = SchemaAwareExternalTransform(
+ identifier=self.schematransform_config.identifier,
+ expansion_service=self._expansion_service,
+ rearrange_based_on_discovery=True,
+ tableId=self._table_id,
+ instanceId=self._instance_id,
+ projectId=self._project_id)
+
+ return (
+ input.pipeline
+ | external_read
+ | beam.ParDo(self._BeamRowToPartialRowData()))
+
+ # PartialRowData has some useful methods for querying data within a row.
+ # To make use of those methods and to give Python users a more familiar
+ # object, we process each Beam Row and return a PartialRowData equivalent.
+ class _BeamRowToPartialRowData(beam.DoFn):
Review Comment:
Let's add unit tests to make sure that this conversion is correct for
various data types etc.
##########
sdks/python/apache_beam/io/gcp/bigtableio.py:
##########
@@ -227,3 +231,75 @@ def expand(self, pvalue):
beam_options['project_id'],
beam_options['instance_id'],
beam_options['table_id'])))
+
+
+class ReadFromBigtable(PTransform):
+ """Reads rows from Bigtable.
+
+ Returns a PCollection of PartialRowData objects, each representing a
+ Bigtable row. For more information about this row object, visit
+
https://cloud.google.com/python/docs/reference/bigtable/latest/row#class-googlecloudbigtablerowpartialrowdatarowkey
+ """
+ URN = "beam:schematransform:org.apache.beam:bigtable_read:v1"
+
+ def __init__(self, table_id, instance_id, project_id,
expansion_service=None):
+ """Initialize a ReadFromBigtable transform.
+
+ :param table_id:
+ The ID of the table to read from.
+ :param instance_id:
+ The ID of the instance where the table resides.
+ :param project_id:
+ The GCP project ID.
+ :param expansion_service:
+ The address of the expansion service. If no expansion service is
+ provided, will attempt to run the default GCP expansion service.
+ """
+ super().__init__()
+ self._table_id = table_id
+ self._instance_id = instance_id
+ self._project_id = project_id
+ self._expansion_service = (
+ expansion_service or BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build'))
+ self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+ self._expansion_service, self.URN)
+
+ def expand(self, input):
+ external_read = SchemaAwareExternalTransform(
+ identifier=self.schematransform_config.identifier,
+ expansion_service=self._expansion_service,
+ rearrange_based_on_discovery=True,
+ tableId=self._table_id,
+ instanceId=self._instance_id,
+ projectId=self._project_id)
+
+ return (
+ input.pipeline
+ | external_read
+ | beam.ParDo(self._BeamRowToPartialRowData()))
+
+ # PartialRowData has some useful methods for querying data within a row.
+ # To make use of those methods and to give Python users a more familiar
+ # object, we process each Beam Row and return a PartialRowData equivalent.
+ class _BeamRowToPartialRowData(beam.DoFn):
+ def process(self, row):
+ key = row.key
+ families = row.column_families
+
+ # initialize PartialRowData object
+ partial_row: PartialRowData = PartialRowData(key)
+ for fam_name, col_fam in families.items():
+ if fam_name not in partial_row.cells:
+ partial_row.cells[fam_name] = {}
+ for col_qualifier, cells in col_fam.items():
+ # store column qualifier as bytes to follow PartialRowData behavior
+ col_qualifier_bytes = col_qualifier.encode()
+ if col_qualifier not in partial_row.cells[fam_name]:
+ partial_row.cells[fam_name][col_qualifier_bytes] = []
Review Comment:
Also, let's add unit tests to cover the inner loop here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]