Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk aa9071d56 -> 739a43197


Parse table schema from JSON


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4c2f62b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4c2f62b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4c2f62b

Branch: refs/heads/python-sdk
Commit: b4c2f62be8a809b666089e7b2fe5dada9cbd6c16
Parents: aa9071d
Author: Sourabh Bajaj <sourabhba...@google.com>
Authored: Wed Nov 30 13:48:28 2016 -0800
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Thu Dec 1 09:07:27 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/bigquery.py      | 37 ++++++++++++++++++++++++
 sdks/python/apache_beam/io/bigquery_test.py | 22 ++++++++++++++
 2 files changed, 59 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4c2f62b/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py 
b/sdks/python/apache_beam/io/bigquery.py
index 8d7892a..0885e3a 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -200,6 +200,43 @@ class TableRowJsonCoder(coders.Coder):
         f=[bigquery.TableCell(v=to_json_value(e)) for e in od.itervalues()])
 
 
+def parse_table_schema_from_json(schema_string):
+  """Parse the Table Schema provided as string.
+
+  Args:
+    schema_string: String serialized table schema, should be a valid JSON.
+
+  Returns:
+    A TableSchema of the BigQuery export from either the Query or the Table.
+  """
+  json_schema = json.loads(schema_string)
+
+  def _parse_schema_field(field):
+    """Parse a single schema field from dictionary.
+
+    Args:
+      field: Dictionary object containing serialized schema.
+
+    Returns:
+      A TableFieldSchema for a single column in BigQuery.
+    """
+    schema = bigquery.TableFieldSchema()
+    schema.name = field['name']
+    schema.type = field['type']
+    if 'mode' in field:
+      schema.mode = field['mode']
+    else:
+      schema.mode = 'NULLABLE'
+    if 'description' in field:
+      schema.description = field['description']
+    if 'fields' in field:
+      schema.fields = [_parse_schema_field(x) for x in field['fields']]
+    return schema
+
+  fields = [_parse_schema_field(f) for f in json_schema['fields']]
+  return bigquery.TableSchema(fields=fields)
+
+
 class BigQueryDisposition(object):
   """Class holding standard strings used for create and write dispositions."""
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4c2f62b/sdks/python/apache_beam/io/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery_test.py 
b/sdks/python/apache_beam/io/bigquery_test.py
index b0c3bbe..e263e13 100644
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ b/sdks/python/apache_beam/io/bigquery_test.py
@@ -32,6 +32,7 @@ from apache_beam.internal.clients import bigquery
 from apache_beam.internal.json_value import to_json_value
 from apache_beam.io.bigquery import RowAsDictJsonCoder
 from apache_beam.io.bigquery import TableRowJsonCoder
+from apache_beam.io.bigquery import parse_table_schema_from_json
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.utils.options import PipelineOptions
@@ -113,6 +114,27 @@ class TestTableRowJsonCoder(unittest.TestCase):
     self.json_compliance_exception(float('-inf'))
 
 
+class TestTableSchemaParser(unittest.TestCase):
+  def test_parse_table_schema_from_json(self):
+    string_field = bigquery.TableFieldSchema(
+        name='s', type='STRING', mode='NULLABLE', description='s description')
+    number_field = bigquery.TableFieldSchema(
+        name='n', type='INTEGER', mode='REQUIRED', description='n description')
+    record_field = bigquery.TableFieldSchema(
+        name='r', type='RECORD', mode='REQUIRED', description='r description',
+        fields=[string_field, number_field])
+    expected_schema = bigquery.TableSchema(fields=[record_field])
+    json_str = json.dumps({'fields': [
+        {'name': 'r', 'type': 'RECORD', 'mode': 'REQUIRED',
+         'description': 'r description', 'fields': [
+             {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE',
+              'description': 's description'},
+             {'name': 'n', 'type': 'INTEGER', 'mode': 'REQUIRED',
+              'description': 'n description'}]}]})
+    self.assertEqual(parse_table_schema_from_json(json_str),
+                     expected_schema)
+
+
 class TestBigQuerySource(unittest.TestCase):
 
   def test_display_data_item_on_validate_true(self):

Reply via email to