Port BigQueryIO to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6395e9d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6395e9d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6395e9d Branch: refs/heads/gearpump-runner Commit: d6395e9d45dcbeb9b3d3e2f8214a49866622b9cf Parents: 87313f1 Author: Kenneth Knowles <[email protected]> Authored: Fri Aug 5 12:26:53 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Aug 8 11:35:17 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 21 ++++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6395e9d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index ed2c32e..36e09f1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -44,7 +44,6 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -1785,7 +1784,7 @@ public class BigQueryIO { return PDone.in(input.getPipeline()); } - private class WriteBundles extends OldDoFn<TableRow, KV<String, Long>> { + private class WriteBundles extends DoFn<TableRow, KV<String, Long>> { private TableRowWriter writer = null; private final String tempFilePrefix; @@ -1793,7 +1792,7 @@ public class BigQueryIO { this.tempFilePrefix = tempFilePrefix; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { if (writer == null) { writer = new TableRowWriter(tempFilePrefix); @@ -1806,7 +1805,7 @@ public class BigQueryIO { // Discard write result and close the write. try { writer.close(); - // The writer does not need to be reset, as this OldDoFn cannot be reused. + // The writer does not need to be reset, as this DoFn cannot be reused. } catch (Exception closeException) { // Do not mask the exception that caused the write to fail. e.addSuppressed(closeException); @@ -1815,7 +1814,7 @@ public class BigQueryIO { } } - @Override + @FinishBundle public void finishBundle(Context c) throws Exception { if (writer != null) { c.output(writer.close()); @@ -1959,7 +1958,7 @@ public class BigQueryIO { /** * Partitions temporary files based on number of files and file sizes. */ - static class WritePartition extends OldDoFn<String, KV<Long, List<String>>> { + static class WritePartition extends DoFn<String, KV<Long, List<String>>> { private final PCollectionView<Iterable<KV<String, Long>>> resultsView; private TupleTag<KV<Long, List<String>>> multiPartitionsTag; private TupleTag<KV<Long, List<String>>> singlePartitionTag; @@ -1973,7 +1972,7 @@ public class BigQueryIO { this.singlePartitionTag = singlePartitionTag; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView)); if (results.isEmpty()) { @@ -2015,7 +2014,7 @@ public class BigQueryIO { /** * Writes partitions to BigQuery tables. */ - static class WriteTables extends OldDoFn<KV<Long, Iterable<List<String>>>, String> { + static class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> { private final boolean singlePartition; private final BigQueryServices bqServices; private final String jobIdToken; @@ -2044,7 +2043,7 @@ public class BigQueryIO { this.createDisposition = createDisposition; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { List<String> partition = Lists.newArrayList(c.element().getValue()).get(0); String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey()); @@ -2149,7 +2148,7 @@ public class BigQueryIO { /** * Copies temporary tables to destination table. */ - static class WriteRename extends OldDoFn<String, Void> { + static class WriteRename extends DoFn<String, Void> { private final BigQueryServices bqServices; private final String jobIdToken; private final String jsonTableRef; @@ -2172,7 +2171,7 @@ public class BigQueryIO { this.tempTablesView = tempTablesView; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView));
