Replace BigQueryIO.Write.to() with BigQueryIO.write().to()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/101715a7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/101715a7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/101715a7 Branch: refs/heads/master Commit: 101715a788509aa9bdfdefd54eaceca35feca485 Parents: 1a252a7 Author: Eugene Kirpichov <[email protected]> Authored: Mon Mar 13 16:21:39 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Tue Mar 14 15:54:39 2017 -0700 ---------------------------------------------------------------------- .../beam/examples/complete/AutoComplete.java | 2 +- .../examples/complete/StreamingWordExtract.java | 2 +- .../examples/complete/TrafficMaxLaneFlow.java | 2 +- .../beam/examples/complete/TrafficRoutes.java | 2 +- .../examples/cookbook/BigQueryTornadoes.java | 2 +- .../cookbook/CombinePerKeyExamples.java | 2 +- .../beam/examples/cookbook/FilterExamples.java | 2 +- .../examples/cookbook/MaxPerKeyExamples.java | 2 +- .../beam/examples/cookbook/TriggerExample.java | 2 +- .../complete/game/utils/WriteToBigQuery.java | 2 +- .../game/utils/WriteWindowedToBigQuery.java | 2 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 46 ++++++++++++-------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 46 ++++++++++---------- 13 files changed, 62 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java index 861a292..fba3dc0 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java @@ -491,7 +491,7 @@ public class AutoComplete { toWrite .apply(ParDo.of(new FormatForBigquery())) - .apply(BigQueryIO.Write + .apply(BigQueryIO.write() .to(tableRef) .withSchema(FormatForBigquery.getSchema()) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java index e8d8950..2e7d451 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java @@ -136,7 +136,7 @@ public class StreamingWordExtract { .apply(ParDo.of(new ExtractWords())) .apply(ParDo.of(new Uppercase())) .apply(ParDo.of(new StringToRowConverter())) - .apply(BigQueryIO.Write.to(tableSpec) + .apply(BigQueryIO.write().to(tableSpec) .withSchema(StringToRowConverter.getSchema())); PipelineResult result = pipeline.run(); http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java index 412f7fb..c9508eb 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java @@ -348,7 +348,7 @@ public class TrafficMaxLaneFlow { Duration.standardMinutes(options.getWindowDuration())). every(Duration.standardMinutes(options.getWindowSlideEvery())))) .apply(new MaxLaneFlow()) - .apply(BigQueryIO.Write.to(tableRef) + .apply(BigQueryIO.write().to(tableRef) .withSchema(FormatMaxesFn.getSchema())); // Run the pipeline. http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java index 50d3ae4..fc5eb89 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java @@ -359,7 +359,7 @@ public class TrafficRoutes { Duration.standardMinutes(options.getWindowDuration())). every(Duration.standardMinutes(options.getWindowSlideEvery())))) .apply(new TrackSpeed()) - .apply(BigQueryIO.Write.to(tableRef) + .apply(BigQueryIO.write().to(tableRef) .withSchema(FormatStatsFn.getSchema())); // Run the pipeline. http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java index d3c9167..f8bc104 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java @@ -158,7 +158,7 @@ public class BigQueryTornadoes { p.apply(BigQueryIO.read().from(options.getInput())) .apply(new CountTornadoes()) - .apply(BigQueryIO.Write + .apply(BigQueryIO.write() .to(options.getOutput()) .withSchema(schema) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java index fc54b13..a7016b0 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java @@ -202,7 +202,7 @@ public class CombinePerKeyExamples { p.apply(BigQueryIO.read().from(options.getInput())) .apply(new PlaysForWord()) - .apply(BigQueryIO.Write + .apply(BigQueryIO.write() .to(options.getOutput()) .withSchema(schema) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java index 714a8f2..7adf7c6 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java @@ -241,7 +241,7 @@ public class FilterExamples { p.apply(BigQueryIO.read().from(options.getInput())) .apply(ParDo.of(new ProjectionFn())) .apply(new BelowGlobalMean(options.getMonthFilter())) - .apply(BigQueryIO.Write + .apply(BigQueryIO.write() .to(options.getOutput()) .withSchema(schema) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java index 7e7bc72..a8dc7f9 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java @@ -151,7 +151,7 @@ public class MaxPerKeyExamples { p.apply(BigQueryIO.read().from(options.getInput())) .apply(new MaxMeanTemp()) - .apply(BigQueryIO.Write + .apply(BigQueryIO.write() .to(options.getOutput()) .withSchema(schema) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index af4a692..7048bde 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -452,7 +452,7 @@ public class TriggerExample { .apply(new CalculateTotalFlow(options.getWindowDuration())); for (int i = 0; i < resultList.size(); i++){ - resultList.get(i).apply(BigQueryIO.Write.to(tableRef).withSchema(getSchema())); + resultList.get(i).apply(BigQueryIO.write().to(tableRef).withSchema(getSchema())); } PipelineResult result = pipeline.run(); http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java index 1f33915..c124624 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java @@ -121,7 +121,7 @@ public class WriteToBigQuery<InputT> public PDone expand(PCollection<InputT> teamAndScore) { return teamAndScore .apply("ConvertToRow", ParDo.of(new BuildRowFn())) - .apply(BigQueryIO.Write + .apply(BigQueryIO.write() .to(getTable(teamAndScore.getPipeline(), tableName)) .withSchema(getSchema()) http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java index 7a4fb2c..7d16fa9 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java @@ -60,7 +60,7 @@ public class WriteWindowedToBigQuery<T> public PDone expand(PCollection<T> teamAndScore) { return teamAndScore .apply("ConvertToRow", ParDo.of(new BuildRowFn())) - .apply(BigQueryIO.Write + .apply(BigQueryIO.write() .to(getTable(teamAndScore.getPipeline(), tableName)) .withSchema(getSchema()) http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index dfb7ea6..3e699d2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1351,12 +1351,12 @@ public class BigQueryIO { * exist. * * <p>By default, tables will be created if they do not exist, which corresponds to a - * {@link CreateDisposition#CREATE_IF_NEEDED} disposition that matches the default of BigQuery's - * Jobs API. A schema must be provided (via {@link BigQueryIO.Write#withSchema(TableSchema)}), + * {@link Write.CreateDisposition#CREATE_IF_NEEDED} disposition that matches the default of + * BigQuery's Jobs API. A schema must be provided (via {@link Write#withSchema(TableSchema)}), * or else the transform may fail at runtime with an {@link IllegalArgumentException}. * * <p>By default, writes require an empty table, which corresponds to - * a {@link WriteDisposition#WRITE_EMPTY} disposition that matches the + * a {@link Write.WriteDisposition#WRITE_EMPTY} disposition that matches the * default of BigQuery's Jobs API. * * <p>Here is a sample transform that produces TableRow values containing @@ -1371,6 +1371,16 @@ public class BigQueryIO { * } * }}</pre> */ + public static Write write() { + return new AutoValue_BigQueryIO_Write.Builder() + .setValidate(true) + .setBigQueryServices(new BigQueryServicesImpl()) + .setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED) + .setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY) + .build(); + } + + /** Implementation of {@link #write}. */ @AutoValue public abstract static class Write extends PTransform<PCollection<TableRow>, PDone> { @VisibleForTesting @@ -1416,14 +1426,6 @@ public class BigQueryIO { abstract Write build(); } - private static Builder builder() { - return new AutoValue_BigQueryIO_Write.Builder() - .setValidate(true) - .setBigQueryServices(new BigQueryServicesImpl()) - .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .setWriteDisposition(WriteDisposition.WRITE_EMPTY); - } - /** * An enumeration type for the BigQuery create disposition strings. * @@ -1491,23 +1493,30 @@ public class BigQueryIO { WRITE_EMPTY } + /** Ensures that methods of the to() family are called at most once. */ + private void ensureToNotCalledYet() { + checkState( + getJsonTableRef() == null && getTable() == null, "to() already called"); + } + /** * Creates a write transformation for the given table specification. * * <p>Refer to {@link #parseTableSpec(String)} for the specification format. */ - public static Write to(String tableSpec) { + public Write to(String tableSpec) { return to(StaticValueProvider.of(tableSpec)); } /** Creates a write transformation for the given table. */ - public static Write to(TableReference table) { + public Write to(TableReference table) { return to(StaticValueProvider.of(toTableSpec(table))); } /** Creates a write transformation for the given table. */ - public static Write to(ValueProvider<String> tableSpec) { - return builder() + public Write to(ValueProvider<String> tableSpec) { + ensureToNotCalledYet(); + return toBuilder() .setJsonTableRef( NestedValueProvider.of( NestedValueProvider.of(tableSpec, new TableSpecToTableRef()), @@ -1526,7 +1535,7 @@ public class BigQueryIO { * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it should * always return the same table specification. */ - public static Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) { + public Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) { return toTableReference(new TranslateTableSpecFunction(tableSpecFunction)); } @@ -1537,9 +1546,10 @@ public class BigQueryIO { * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should * always return the same table reference. */ - private static Write toTableReference( + private Write toTableReference( SerializableFunction<BoundedWindow, TableReference> tableRefFunction) { - return builder().setTableRefFunction(tableRefFunction).build(); + ensureToNotCalledYet(); + return toBuilder().setTableRefFunction(tableRefFunction).build(); } private static class TranslateTableSpecFunction implements http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index f6a7fb4..8a53d02 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -963,7 +963,7 @@ public class BigQueryIOTest implements Serializable { new TableRow().set("name", "b").set("number", 2), new TableRow().set("name", "c").set("number", 3)) .withCoder(TableRowJsonCoder.of())) - .apply(BigQueryIO.Write.to("dataset-id.table-id") + .apply(BigQueryIO.write().to("dataset-id.table-id") .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withSchema(new TableSchema().setFields( ImmutableList.of( @@ -997,7 +997,7 @@ public class BigQueryIOTest implements Serializable { new TableRow().set("name", "d").set("number", 4)) .withCoder(TableRowJsonCoder.of())) .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) - .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id") + .apply(BigQueryIO.write().to("project-id:dataset-id.table-id") .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withSchema(new TableSchema().setFields( ImmutableList.of( @@ -1153,7 +1153,7 @@ public class BigQueryIOTest implements Serializable { p.apply(Create.of(inserts).withCoder(TableRowJsonCoder.of())) .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) .apply(Window.<TableRow>into(window)) - .apply(BigQueryIO.Write + .apply(BigQueryIO.write() .to(tableFunction) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withSchema(new TableSchema().setFields( @@ -1205,7 +1205,7 @@ public class BigQueryIOTest implements Serializable { new TableRow().set("name", "b").set("number", 2), new TableRow().set("name", "c").set("number", 3)) .withCoder(TableRowJsonCoder.of())) - .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id") + .apply(BigQueryIO.write().to("project-id:dataset-id.table-id") .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withTestServices(fakeBqServices) .withoutValidation()); @@ -1238,7 +1238,7 @@ public class BigQueryIOTest implements Serializable { new TableRow().set("name", "b").set("number", 2), new TableRow().set("name", "c").set("number", 3)) .withCoder(TableRowJsonCoder.of())) - .apply(BigQueryIO.Write.to("dataset-id.table-id") + .apply(BigQueryIO.write().to("dataset-id.table-id") .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withTestServices(fakeBqServices) .withoutValidation()); @@ -1328,7 +1328,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildWrite() { BigQueryIO.Write write = - BigQueryIO.Write.to("foo.com:project:somedataset.sometable"); + BigQueryIO.write().to("foo.com:project:somedataset.sometable"); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); @@ -1354,7 +1354,7 @@ public class BigQueryIOTest implements Serializable { options.as(StreamingOptions.class).setStreaming(streaming); DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options); - BigQueryIO.Write write = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.write() .to("project:dataset.table") .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2")) .withTestServices(new FakeBigQueryServices() @@ -1375,7 +1375,7 @@ public class BigQueryIOTest implements Serializable { // This test just checks that using withoutValidation will not trigger object // construction errors. BigQueryIO.Write write = - BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation(); + BigQueryIO.write().to("foo.com:project:somedataset.sometable").withoutValidation(); checkWriteObjectWithValidate( write, "foo.com:project", @@ -1390,7 +1390,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildWriteDefaultProject() { - BigQueryIO.Write write = BigQueryIO.Write.to("somedataset.sometable"); + BigQueryIO.Write write = BigQueryIO.write().to("somedataset.sometable"); checkWriteObject( write, null, "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); @@ -1402,7 +1402,7 @@ public class BigQueryIOTest implements Serializable { .setProjectId("foo.com:project") .setDatasetId("somedataset") .setTableId("sometable"); - BigQueryIO.Write write = BigQueryIO.Write.to(table); + BigQueryIO.Write write = BigQueryIO.write().to(table); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); @@ -1412,7 +1412,7 @@ public class BigQueryIOTest implements Serializable { public void testBuildWriteWithSchema() { TableSchema schema = new TableSchema(); BigQueryIO.Write write = - BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema); + BigQueryIO.write().to("foo.com:project:somedataset.sometable").withSchema(schema); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); @@ -1420,7 +1420,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildWriteWithCreateDispositionNever() { - BigQueryIO.Write write = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.write() .to("foo.com:project:somedataset.sometable") .withCreateDisposition(CreateDisposition.CREATE_NEVER); checkWriteObject( @@ -1430,7 +1430,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildWriteWithCreateDispositionIfNeeded() { - BigQueryIO.Write write = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.write() .to("foo.com:project:somedataset.sometable") .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED); checkWriteObject( @@ -1440,7 +1440,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildWriteWithWriteDispositionTruncate() { - BigQueryIO.Write write = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.write() .to("foo.com:project:somedataset.sometable") .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE); checkWriteObject( @@ -1450,7 +1450,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildWriteWithWriteDispositionAppend() { - BigQueryIO.Write write = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.write() .to("foo.com:project:somedataset.sometable") .withWriteDisposition(WriteDisposition.WRITE_APPEND); checkWriteObject( @@ -1460,7 +1460,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildWriteWithWriteDispositionEmpty() { - BigQueryIO.Write write = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.write() .to("foo.com:project:somedataset.sometable") .withWriteDisposition(WriteDisposition.WRITE_EMPTY); checkWriteObject( @@ -1471,7 +1471,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildWriteWithWriteWithTableDescription() { final String tblDescription = "foo bar table"; - BigQueryIO.Write write = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.write() .to("foo.com:project:somedataset.sometable") .withTableDescription(tblDescription); checkWriteObject( @@ -1491,7 +1491,7 @@ public class BigQueryIOTest implements Serializable { TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2"); final String tblDescription = "foo bar table"; - BigQueryIO.Write write = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.write() .to(tableSpec) .withSchema(schema) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) @@ -1556,7 +1556,7 @@ public class BigQueryIOTest implements Serializable { .or(Matchers.containsString("BigQuery dataset not found for table"))); tableRows .apply( - BigQueryIO.Write.to(tableRef) + BigQueryIO.write().to(tableRef) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withSchema(new TableSchema()) .withTestServices(fakeBqServices)); @@ -1603,7 +1603,7 @@ public class BigQueryIOTest implements Serializable { })) .setCoder(TableRowJsonCoder.of()); tableRows - .apply(BigQueryIO.Write.to(tableRef) + .apply(BigQueryIO.write().to(tableRef) .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withoutValidation()); } @@ -1675,7 +1675,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBigQueryIOGetName() { assertEquals("BigQueryIO.Read", BigQueryIO.read().from("somedataset.sometable").getName()); - assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName()); + assertEquals("BigQueryIO.Write", BigQueryIO.write().to("somedataset.sometable").getName()); } @Test @@ -1686,7 +1686,7 @@ public class BigQueryIOTest implements Serializable { thrown.expectMessage("no schema was provided"); p .apply(Create.empty(TableRowJsonCoder.of())) - .apply(BigQueryIO.Write + .apply(BigQueryIO.write() .to("dataset.table") .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)); } @@ -2343,7 +2343,7 @@ public class BigQueryIOTest implements Serializable { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); bqOptions.setTempLocation("gs://testbucket/testdir"); Pipeline pipeline = TestPipeline.create(options); - BigQueryIO.Write write = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.write() .to(options.getOutputTable()) .withSchema(NestedValueProvider.of( options.getOutputSchema(), new JsonSchemaToTableSchema()))
