[BEAM-2532] Memoizes TableSchema in BigQuerySourceBase

Instead of parsing the JSON schema for every record.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e86c004d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e86c004d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e86c004d

Branch: refs/heads/DSL_SQL
Commit: e86c004de5d4b5f8bd0c3c53207cf3c1760f5d8e
Parents: d510175
Author: Neville Li <nevi...@spotify.com>
Authored: Tue Jul 18 09:07:21 2017 -0400
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Tue Jul 18 22:28:57 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java  | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e86c004d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 2de60a2..2b1eafe 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -29,11 +29,16 @@ import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.AvroSource;
@@ -168,10 +173,12 @@ abstract class BigQuerySourceBase extends 
BoundedSource<TableRow> {
 
     SerializableFunction<GenericRecord, TableRow> function =
         new SerializableFunction<GenericRecord, TableRow>() {
+          private Supplier<TableSchema> schema = Suppliers.memoize(
+              Suppliers.compose(new TableSchemaFunction(), 
Suppliers.ofInstance(jsonSchema)));
+
           @Override
           public TableRow apply(GenericRecord input) {
-            return BigQueryAvroUtils.convertGenericRecordToTableRow(
-                input, BigQueryHelpers.fromJsonString(jsonSchema, 
TableSchema.class));
+            return BigQueryAvroUtils.convertGenericRecordToTableRow(input, 
schema.get());
           }};
 
     List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
@@ -182,6 +189,14 @@ abstract class BigQuerySourceBase extends 
BoundedSource<TableRow> {
     return ImmutableList.copyOf(avroSources);
   }
 
+  private static class TableSchemaFunction implements Serializable, 
Function<String, TableSchema> {
+    @Nullable
+    @Override
+    public TableSchema apply(@Nullable String input) {
+      return BigQueryHelpers.fromJsonString(input, TableSchema.class);
+    }
+  }
+
   protected static class BigQueryReader extends BoundedReader<TableRow> {
     private final BigQuerySourceBase source;
     private final BigQueryServices.BigQueryJsonReader reader;

Reply via email to