Simplify configuration of StreamWithDedup
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1adcbaea Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1adcbaea Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1adcbaea Branch: refs/heads/master Commit: 1adcbaea799e83016ec91f7b7155c3a25804ce6c Parents: 5c71589 Author: Eugene Kirpichov <[email protected]> Authored: Thu Mar 2 18:19:51 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Tue Mar 14 15:54:32 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 73 +++++++------------- 1 file changed, 26 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1adcbaea/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index d2f6ba6..e039c8c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -40,7 +40,6 @@ import com.google.auto.value.AutoValue; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.MoreObjects; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -462,7 +461,7 @@ public class BigQueryIO { abstract boolean getValidate(); @Nullable abstract Boolean getFlattenResults(); @Nullable abstract Boolean getUseLegacySql(); - @Nullable abstract BigQueryServices getBigQueryServices(); + abstract BigQueryServices getBigQueryServices(); abstract Builder toBuilder(); @@ -645,8 +644,6 @@ public class BigQueryIO { jobUuid, new BeamJobUuidToBigQueryJobUuid()); BoundedSource<TableRow> source; - final BigQueryServices bqServices = - MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl()); final String extractDestinationDir; String tempLocation = bqOptions.getTempLocation(); @@ -670,11 +667,11 @@ public class BigQueryIO { getFlattenResults(), getUseLegacySql(), extractDestinationDir, - bqServices); + getBigQueryServices()); } else { ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions); source = BigQueryTableSource.create( - jobIdToken, inputTable, extractDestinationDir, bqServices, + jobIdToken, inputTable, extractDestinationDir, getBigQueryServices(), StaticValueProvider.of(executingProject)); } PassThroughThenCleanup.CleanupOperation cleanupOperation = @@ -687,7 +684,7 @@ public class BigQueryIO { .setProjectId(executingProject) .setJobId(getExtractJobId(jobIdToken)); - Job extractJob = bqServices.getJobService(bqOptions) + Job extractJob = getBigQueryServices().getJobService(bqOptions) .getJob(jobRef); Collection<String> extractFiles = null; @@ -1390,7 +1387,7 @@ public class BigQueryIO { @Nullable abstract String getTableDescription(); /** An option to indicate if table validation is desired. Default is true. */ abstract boolean getValidate(); - @Nullable abstract BigQueryServices getBigQueryServices(); + abstract BigQueryServices getBigQueryServices(); abstract Builder toBuilder(); @@ -1650,12 +1647,10 @@ public class BigQueryIO { "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided."); // The user specified a table. - BigQueryServices bqServices = - MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl()); if (getJsonTableRef() != null && getValidate()) { TableReference table = getTableWithDefaultProject(options).get(); - DatasetService datasetService = bqServices.getDatasetService(options); + DatasetService datasetService = getBigQueryServices().getDatasetService(options); // Check for destination table presence and emptiness for early failure notification. // Note that a presence check can fail when the table or dataset is created by an earlier // stage of the pipeline. For these cases the #withoutValidation method can be used to @@ -1693,7 +1688,7 @@ public class BigQueryIO { checkArgument( !Strings.isNullOrEmpty(tempLocation), "BigQueryIO.Write needs a GCS temp location to store temp files."); - if (bqServices == null) { + if (getBigQueryServices() == null) { try { GcsPath.fromUri(tempLocation); } catch (IllegalArgumentException e) { @@ -1711,19 +1706,11 @@ public class BigQueryIO { public PDone expand(PCollection<TableRow> input) { Pipeline p = input.getPipeline(); BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); - BigQueryServices bqServices = - MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl()); // When writing an Unbounded PCollection, or when a tablespec function is defined, we use // StreamWithDeDup and BigQuery's streaming import API. if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) { - return input.apply( - new StreamWithDeDup(getTable(), getTableRefFunction(), - getJsonSchema() == null ? null : NestedValueProvider.of( - getJsonSchema(), new JsonSchemaToTableSchema()), - getCreateDisposition(), - getTableDescription(), - bqServices)); + return input.apply(new StreamWithDeDup(this)); } ValueProvider<TableReference> table = getTableWithDefaultProject(options); @@ -1786,7 +1773,7 @@ public class BigQueryIO { .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create()) .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables( false, - bqServices, + getBigQueryServices(), jobIdTokenView, tempFilePrefix, NestedValueProvider.of(table, new TableRefToJson()), @@ -1800,7 +1787,7 @@ public class BigQueryIO { .apply("TempTablesView", View.<String>asIterable()); singleton.apply(ParDo .of(new WriteRename( - bqServices, + getBigQueryServices(), jobIdTokenView, NestedValueProvider.of(table, new TableRefToJson()), getWriteDisposition(), @@ -1814,7 +1801,7 @@ public class BigQueryIO { .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create()) .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables( true, - bqServices, + getBigQueryServices(), jobIdTokenView, tempFilePrefix, NestedValueProvider.of(table, new TableRefToJson()), @@ -2740,25 +2727,11 @@ public class BigQueryIO { * it leverages BigQuery best effort de-dup mechanism. */ private static class StreamWithDeDup extends PTransform<PCollection<TableRow>, PDone> { - @Nullable private final transient ValueProvider<TableReference> tableReference; - @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction; - @Nullable private final transient ValueProvider<TableSchema> tableSchema; - private final Write.CreateDisposition createDisposition; - private final BigQueryServices bqServices; - @Nullable private final String tableDescription; + private final Write write; /** Constructor. */ - StreamWithDeDup(ValueProvider<TableReference> tableReference, - @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction, - @Nullable ValueProvider<TableSchema> tableSchema, - Write.CreateDisposition createDisposition, - @Nullable String tableDescription, BigQueryServices bqServices) { - this.tableReference = tableReference; - this.tableRefFunction = tableRefFunction; - this.tableSchema = tableSchema; - this.createDisposition = createDisposition; - this.bqServices = checkNotNull(bqServices, "bqServices"); - this.tableDescription = tableDescription; + StreamWithDeDup(Write write) { + this.write = write; } @Override @@ -2780,20 +2753,26 @@ public class BigQueryIO { PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = input.apply(ParDo.of( new TagWithUniqueIdsAndTable(input.getPipeline().getOptions().as(BigQueryOptions.class), - tableReference, tableRefFunction))); + write.getTable(), write.getTableRefFunction()))); // To prevent having the same TableRow processed more than once with regenerated // different unique ids, this implementation relies on "checkpointing", which is // achieved as a side effect of having StreamingWriteFn immediately follow a GBK, // performed by Reshuffle. + NestedValueProvider<TableSchema, String> schema = + write.getJsonSchema() == null + ? null + : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema()); tagged .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of())) .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of()) - .apply(ParDo.of(new StreamingWriteFn( - tableSchema, - createDisposition, - tableDescription, - bqServices))); + .apply( + ParDo.of( + new StreamingWriteFn( + schema, + write.getCreateDisposition(), + write.getTableDescription(), + write.getBigQueryServices()))); // Note that the implementation to return PDone here breaks the // implicit assumption about the job execution order. If a user
