[BEAM-2532] Memoizes TableSchema in BigQuerySourceBase Instead of parsing the JSON schema for every record.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e86c004d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e86c004d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e86c004d Branch: refs/heads/DSL_SQL Commit: e86c004de5d4b5f8bd0c3c53207cf3c1760f5d8e Parents: d510175 Author: Neville Li <nevi...@spotify.com> Authored: Tue Jul 18 09:07:21 2017 -0400 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Tue Jul 18 22:28:57 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e86c004d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index 2de60a2..2b1eafe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -29,11 +29,16 @@ import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.io.IOException; +import java.io.Serializable; import java.util.List; import java.util.NoSuchElementException; +import javax.annotation.Nullable; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.AvroSource; @@ -168,10 +173,12 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { SerializableFunction<GenericRecord, TableRow> function = new SerializableFunction<GenericRecord, TableRow>() { + private Supplier<TableSchema> schema = Suppliers.memoize( + Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(jsonSchema))); + @Override public TableRow apply(GenericRecord input) { - return BigQueryAvroUtils.convertGenericRecordToTableRow( - input, BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class)); + return BigQueryAvroUtils.convertGenericRecordToTableRow(input, schema.get()); }}; List<BoundedSource<TableRow>> avroSources = Lists.newArrayList(); @@ -182,6 +189,14 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { return ImmutableList.copyOf(avroSources); } + private static class TableSchemaFunction implements Serializable, Function<String, TableSchema> { + @Nullable + @Override + public TableSchema apply(@Nullable String input) { + return BigQueryHelpers.fromJsonString(input, TableSchema.class); + } + } + protected static class BigQueryReader extends BoundedReader<TableRow> { private final BigQuerySourceBase source; private final BigQueryServices.BigQueryJsonReader reader;