Hi Noah!
Thanks for contributing this. The change is relatively small, so I think
it's best to discuss it in a pull request, so we can look at the diff.
My preliminary comment would be that the first part makes sense:

if isinstance(schema, (str, unicode)):

 schema = bigquery_tools.parse_table_schema_from_json(schema)


And the second part seems unnecessary, but perhaps I'm missing something.
Why are you reparsing the schema?

 elif isinstance(schema, dict):

 schema = bigquery_tools.parse_table_schema_from_json(json.dumps(schema))


Thanks! Please submit a pull request, and tag me (@pabloem) to review.
Best
-P.
On Mon, Oct 21, 2019 at 3:38 PM Noah Goodrich <m...@noahgoodrich.com> wrote:

> I have created this issue (https://issues.apache.org/jira/browse/BEAM-8452
> ).
>
> The contributors guide suggests that if an issue is your first, it should
> be discussed on this mailing list. I would like to hear thoughts,
> questions, concerns etc on this proposed fix:
>
> def process(self, element, load_job_name_prefix, *schema_side_inputs):
>
> # Each load job is assumed to have files respecting these constraints:
>
>  # 1. Total size of all files < 15 TB (Max size for load jobs)
>
>  # 2. Total no. of files in a single load job < 10,000
>
>  # This assumption means that there will always be a single load job
>
>  # triggered for each partition of files.
>
>  destination = element[0]
>
>  files = element[1]
>
> if callable(self.schema):
>
>  schema = self.schema(destination, *schema_side_inputs)
>
>  elif isinstance(self.schema, vp.ValueProvider):
>
>  schema = self.schema.get()
>
>  else:
>
>  schema = self.schema
>
> if isinstance(schema, (str, unicode)):
>
>  schema = bigquery_tools.parse_table_schema_from_json(schema)
>
>  elif isinstance(schema, dict):
>
>  schema = bigquery_tools.parse_table_schema_from_json(json.dumps(schema))
>
> ....
>
>

Reply via email to