pabloem commented on code in PR #21872:
URL: https://github.com/apache/beam/pull/21872#discussion_r902913424


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1494,91 @@ def writer(self, test_bigquery_client=None, 
buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:

Review Comment:
   add a nice, detailed Pydoc for the class.



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1494,91 @@ def writer(self, test_bigquery_client=None, 
buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:
+  def __init__(
+      self,
+      method=None,
+      destination=None,
+      schema=None,
+      destination_load_jobid_pairs=None,
+      destination_file_pairs=None,
+      destination_copy_jobid_pairs=None,
+      failed_rows=None,
+      failed_rows_with_errors=None):
+
+    self.method = method
+    self.destination = destination
+    self.schema = schema
+    self.destination_load_jobid_pairs = destination_load_jobid_pairs
+    self.destination_file_pairs = destination_file_pairs
+    self.destination_copy_jobid_pairs = destination_copy_jobid_pairs
+    self.failed_rows = failed_rows
+    self.failed_rows_with_errors = failed_rows_with_errors
+
+    self.config = {
+        'method': method,
+        'destination': destination,
+        'schema': schema,
+    }
+
+    from apache_beam.io.gcp.bigquery_file_loads import BigQueryBatchFileLoads
+    self.attributes = {
+        BigQueryWriteFn.FAILED_ROWS: self.get_failed_rows,
+        BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: self.
+        get_failed_rows_with_errors,
+        BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS: self.
+        get_destination_load_jobid_pairs,
+        BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS: self.
+        get_destination_file_pairs,
+        BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS: self.
+        get_destination_copy_jobid_pairs,
+        'write_configuration': self.get_write_configuration
+    }
+
+  def get_write_configuration(self):
+    return self.config
+
+  def validate(self, method, attribute):
+    if self.method != method:
+      raise ValueError(
+          f'Cannot get {attribute} because it is not produced '
+          f'by {self.method} write method. Note: only {method} '
+          'produces this attribute.')
+
+  def get_destination_load_jobid_pairs(self):
+    self.validate('FILE_LOADS', 'DESTINATION_JOBID_PAIRS')
+
+    return self.destination_load_jobid_pairs
+
+  def get_destination_file_pairs(self):
+    self.validate('FILE_LOADS', 'DESTINATION_FILE_PAIRS')
+
+    return self.destination_file_pairs
+
+  def get_destination_copy_jobid_pairs(self):
+    self.validate('FILE_LOADS', 'DESTINATION_COPY_JOBID_PAIRS')
+
+    return self.destination_copy_jobid_pairs
+
+  def get_failed_rows(self):
+    self.validate('STREAMING_INSERTS', 'FAILED_ROWS')
+
+    return self.failed_rows
+
+  def get_failed_rows_with_errors(self):

Review Comment:
   I like this. A few changes I propose for these methods:
   - Remove the `get_` in the names. Let them have the name of the property 
(the class attributes will have to be `self._destination_file_pairs`).
   - Annotate with `@property` so people can do `result.failed_rows_with_errors`



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1494,91 @@ def writer(self, test_bigquery_client=None, 
buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:
+  def __init__(
+      self,
+      method=None,
+      destination=None,
+      schema=None,
+      destination_load_jobid_pairs=None,
+      destination_file_pairs=None,
+      destination_copy_jobid_pairs=None,
+      failed_rows=None,
+      failed_rows_with_errors=None):
+
+    self.method = method
+    self.destination = destination
+    self.schema = schema
+    self.destination_load_jobid_pairs = destination_load_jobid_pairs
+    self.destination_file_pairs = destination_file_pairs
+    self.destination_copy_jobid_pairs = destination_copy_jobid_pairs
+    self.failed_rows = failed_rows
+    self.failed_rows_with_errors = failed_rows_with_errors
+
+    self.config = {
+        'method': method,
+        'destination': destination,
+        'schema': schema,
+    }
+
+    from apache_beam.io.gcp.bigquery_file_loads import BigQueryBatchFileLoads
+    self.attributes = {
+        BigQueryWriteFn.FAILED_ROWS: self.get_failed_rows,
+        BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: self.
+        get_failed_rows_with_errors,
+        BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS: self.
+        get_destination_load_jobid_pairs,
+        BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS: self.
+        get_destination_file_pairs,
+        BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS: self.
+        get_destination_copy_jobid_pairs,
+        'write_configuration': self.get_write_configuration
+    }
+
+  def get_write_configuration(self):
+    return self.config
+
+  def validate(self, method, attribute):
+    if self.method != method:
+      raise ValueError(
+          f'Cannot get {attribute} because it is not produced '
+          f'by {self.method} write method. Note: only {method} '
+          'produces this attribute.')
+
+  def get_destination_load_jobid_pairs(self):
+    self.validate('FILE_LOADS', 'DESTINATION_JOBID_PAIRS')
+
+    return self.destination_load_jobid_pairs
+
+  def get_destination_file_pairs(self):
+    self.validate('FILE_LOADS', 'DESTINATION_FILE_PAIRS')
+
+    return self.destination_file_pairs
+
+  def get_destination_copy_jobid_pairs(self):
+    self.validate('FILE_LOADS', 'DESTINATION_COPY_JOBID_PAIRS')
+
+    return self.destination_copy_jobid_pairs
+
+  def get_failed_rows(self):
+    self.validate('STREAMING_INSERTS', 'FAILED_ROWS')
+
+    return self.failed_rows
+
+  def get_failed_rows_with_errors(self):

Review Comment:
   let me know if that makes sense.



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1494,91 @@ def writer(self, test_bigquery_client=None, 
buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:
+  def __init__(
+      self,
+      method=None,
+      destination=None,
+      schema=None,
+      destination_load_jobid_pairs=None,
+      destination_file_pairs=None,
+      destination_copy_jobid_pairs=None,
+      failed_rows=None,
+      failed_rows_with_errors=None):
+
+    self.method = method
+    self.destination = destination
+    self.schema = schema
+    self.destination_load_jobid_pairs = destination_load_jobid_pairs
+    self.destination_file_pairs = destination_file_pairs
+    self.destination_copy_jobid_pairs = destination_copy_jobid_pairs
+    self.failed_rows = failed_rows
+    self.failed_rows_with_errors = failed_rows_with_errors

Review Comment:
   please annotate the types for all of these attributes



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1494,91 @@ def writer(self, test_bigquery_client=None, 
buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:
+  def __init__(
+      self,
+      method=None,
+      destination=None,
+      schema=None,

Review Comment:
   do you think we need `destination` and `schema`? Maybe we don't really need 
them. What are your thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to