Refactor BigQueryIO helper methods into a BigQueryHelpers class. Move helper transforms for BigQueryIO.Read into individual files. Change private to package in BigQueryHelpers.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2cc2e81f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2cc2e81f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2cc2e81f Branch: refs/heads/master Commit: 2cc2e81f241e1d5d590f12d4a8491273908e2cce Parents: 434eadb Author: Reuven Lax <[email protected]> Authored: Fri Mar 17 15:15:16 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Mar 28 08:46:15 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 243 ++++++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 853 ++----------------- .../io/gcp/bigquery/BigQueryQuerySource.java | 186 ++++ .../io/gcp/bigquery/BigQueryServicesImpl.java | 2 +- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 178 ++++ .../io/gcp/bigquery/BigQueryTableSource.java | 86 ++ .../io/gcp/bigquery/PassThroughThenCleanup.java | 66 ++ .../sdk/io/gcp/bigquery/TransformingSource.java | 118 +++ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 54 +- .../sdk/io/gcp/bigquery/BigQueryUtilTest.java | 12 +- 10 files changed, 970 insertions(+), 828 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java new file mode 100644 index 0000000..9fba938 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -0,0 +1,243 @@ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.regex.Matcher; +import javax.annotation.Nullable; + +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.SerializableFunction; + + +/** + * A set of helper functions and classes used by {@link BigQueryIO}. + */ +public class BigQueryHelpers { + @Nullable + /** + * Return a displayable string representation for a {@link TableReference}. + */ + static ValueProvider<String> displayTable( + @Nullable ValueProvider<TableReference> table) { + if (table == null) { + return null; + } + return NestedValueProvider.of(table, new TableRefToTableSpec()); + } + + /** + * Returns a canonical string representation of the {@link TableReference}. + */ + static String toTableSpec(TableReference ref) { + StringBuilder sb = new StringBuilder(); + if (ref.getProjectId() != null) { + sb.append(ref.getProjectId()); + sb.append(":"); + } + + sb.append(ref.getDatasetId()).append('.').append(ref.getTableId()); + return sb.toString(); + } + + static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) { + List<V> value = map.get(key); + if (value == null) { + value = new ArrayList<>(); + map.put(key, value); + } + return value; + } + + /** + * Parse a table specification in the form + * {@code "[project_id]:[dataset_id].[table_id]"} or {@code "[dataset_id].[table_id]"}. + * + * <p>If the project id is omitted, the default project id is used. + */ + static TableReference parseTableSpec(String tableSpec) { + Matcher match = BigQueryIO.TABLE_SPEC.matcher(tableSpec); + if (!match.matches()) { + throw new IllegalArgumentException( + "Table reference is not in [project_id]:[dataset_id].[table_id] " + + "format: " + tableSpec); + } + + TableReference ref = new TableReference(); + ref.setProjectId(match.group("PROJECT")); + + return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE")); + } + + static String jobToPrettyString(@Nullable Job job) throws IOException { + return job == null ? "null" : job.toPrettyString(); + } + + static String statusToPrettyString(@Nullable JobStatus status) throws IOException { + return status == null ? "Unknown status: null." : status.toPrettyString(); + } + + static Status parseStatus(@Nullable Job job) { + if (job == null) { + return Status.UNKNOWN; + } + JobStatus status = job.getStatus(); + if (status.getErrorResult() != null) { + return Status.FAILED; + } else if (status.getErrors() != null && !status.getErrors().isEmpty()) { + return Status.FAILED; + } else { + return Status.SUCCEEDED; + } + } + + @VisibleForTesting + static String toJsonString(Object item) { + if (item == null) { + return null; + } + try { + return BigQueryIO.JSON_FACTORY.toString(item); + } catch (IOException e) { + throw new RuntimeException( + String.format("Cannot serialize %s to a JSON string.", item.getClass().getSimpleName()), + e); + } + } + + @VisibleForTesting + static <T> T fromJsonString(String json, Class<T> clazz) { + if (json == null) { + return null; + } + try { + return BigQueryIO.JSON_FACTORY.fromString(json, clazz); + } catch (IOException e) { + throw new RuntimeException( + String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json), + e); + } + } + + /** + * Returns a randomUUID string. + * + * <p>{@code '-'} is removed because BigQuery doesn't allow it in dataset id. + */ + static String randomUUIDString() { + return UUID.randomUUID().toString().replaceAll("-", ""); + } + + @VisibleForTesting + static class JsonSchemaToTableSchema + implements SerializableFunction<String, TableSchema> { + @Override + public TableSchema apply(String from) { + return fromJsonString(from, TableSchema.class); + } + } + + @VisibleForTesting + static class BeamJobUuidToBigQueryJobUuid + implements SerializableFunction<String, String> { + @Override + public String apply(String from) { + return "beam_job_" + from; + } + } + + static class TableSchemaToJsonSchema + implements SerializableFunction<TableSchema, String> { + @Override + public String apply(TableSchema from) { + return toJsonString(from); + } + } + + static class JsonTableRefToTableRef + implements SerializableFunction<String, TableReference> { + @Override + public TableReference apply(String from) { + return fromJsonString(from, TableReference.class); + } + } + + static class TableRefToTableSpec + implements SerializableFunction<TableReference, String> { + @Override + public String apply(TableReference from) { + return toTableSpec(from); + } + } + + static class TableRefToJson + implements SerializableFunction<TableReference, String> { + @Override + public String apply(TableReference from) { + return toJsonString(from); + } + } + + static class TableRefToProjectId + implements SerializableFunction<TableReference, String> { + @Override + public String apply(TableReference from) { + return from.getProjectId(); + } + } + + @VisibleForTesting + static class TableSpecToTableRef + implements SerializableFunction<String, TableReference> { + @Override + public TableReference apply(String from) { + return parseTableSpec(from); + } + } + + @VisibleForTesting + static class CreatePerBeamJobUuid + implements SerializableFunction<String, String> { + private final String stepUuid; + + CreatePerBeamJobUuid(String stepUuid) { + this.stepUuid = stepUuid; + } + + @Override + public String apply(String jobUuid) { + return stepUuid + "_" + jobUuid.replaceAll("-", ""); + } + } + + @VisibleForTesting + static class CreateJsonTableRefFromUuid + implements SerializableFunction<String, TableReference> { + private final String executingProject; + + public CreateJsonTableRefFromUuid(String executingProject) { + this.executingProject = executingProject; + } + + @Override + public TableReference apply(String jobUuid) { + String queryTempDatasetId = "temp_dataset_" + jobUuid; + String queryTempTableId = "temp_table_" + jobUuid; + TableReference queryTempTableRef = new TableReference() + .setProjectId(executingProject) + .setDatasetId(queryTempDatasetId) + .setTableId(queryTempTableId); + return queryTempTableRef; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index d195afd..3f2f3e8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -25,13 +25,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.api.client.json.JsonFactory; import com.google.api.services.bigquery.model.Job; -import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; -import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; @@ -39,38 +37,31 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.value.AutoValue; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.io.CountingOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.ObjectInputStream; import java.io.OutputStream; -import java.io.Serializable; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; -import org.apache.avro.generic.GenericRecord; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; @@ -82,8 +73,16 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.AvroSource; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.BeamJobUuidToBigQueryJobUuid; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreateJsonTableRefFromUuid; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreatePerBeamJobUuid; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.metrics.Counter; @@ -131,7 +130,6 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow; -import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,7 +151,7 @@ import org.slf4j.LoggerFactory; * from the <a href="https://cloud.google.com/bigquery/client-libraries"> * BigQuery Java Client API</a>. * Tables can be referred to as Strings, with or without the {@code projectId}. - * A helper function is provided ({@link BigQueryIO#parseTableSpec(String)}) + * A helper function is provided ({@link BigQueryHelpers#parseTableSpec(String)}) * that parses the following string forms into a {@link TableReference}: * * <ul> @@ -253,7 +251,7 @@ public class BigQueryIO { * Singleton instance of the JSON factory used to read and write JSON * formatted rows. */ - private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory(); + static final JsonFactory JSON_FACTORY = Transport.getJsonFactory(); /** * Project IDs must contain 6-63 lowercase letters, digits, or dashes. @@ -281,7 +279,7 @@ public class BigQueryIO { String.format("((?<PROJECT>%s):)?(?<DATASET>%s)\\.(?<TABLE>%s)", PROJECT_ID_REGEXP, DATASET_REGEXP, TABLE_REGEXP); - private static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP); + static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP); private static final String RESOURCE_NOT_FOUND_ERROR = "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline" @@ -293,152 +291,6 @@ public class BigQueryIO { + " an earlier stage of the pipeline, this validation can be disabled using" + " #withoutValidation."; - /** - * Parse a table specification in the form - * {@code "[project_id]:[dataset_id].[table_id]"} or {@code "[dataset_id].[table_id]"}. - * - * <p>If the project id is omitted, the default project id is used. - */ - public static TableReference parseTableSpec(String tableSpec) { - Matcher match = TABLE_SPEC.matcher(tableSpec); - if (!match.matches()) { - throw new IllegalArgumentException( - "Table reference is not in [project_id]:[dataset_id].[table_id] " - + "format: " + tableSpec); - } - - TableReference ref = new TableReference(); - ref.setProjectId(match.group("PROJECT")); - - return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE")); - } - - /** - * Returns a canonical string representation of the {@link TableReference}. - */ - public static String toTableSpec(TableReference ref) { - StringBuilder sb = new StringBuilder(); - if (ref.getProjectId() != null) { - sb.append(ref.getProjectId()); - sb.append(":"); - } - - sb.append(ref.getDatasetId()).append('.').append(ref.getTableId()); - return sb.toString(); - } - - @VisibleForTesting - static class JsonSchemaToTableSchema - implements SerializableFunction<String, TableSchema> { - @Override - public TableSchema apply(String from) { - return fromJsonString(from, TableSchema.class); - } - } - - private static class TableSchemaToJsonSchema - implements SerializableFunction<TableSchema, String> { - @Override - public String apply(TableSchema from) { - return toJsonString(from); - } - } - - private static class JsonTableRefToTableRef - implements SerializableFunction<String, TableReference> { - @Override - public TableReference apply(String from) { - return fromJsonString(from, TableReference.class); - } - } - - private static class TableRefToTableSpec - implements SerializableFunction<TableReference, String> { - @Override - public String apply(TableReference from) { - return toTableSpec(from); - } - } - - private static class TableRefToJson - implements SerializableFunction<TableReference, String> { - @Override - public String apply(TableReference from) { - return toJsonString(from); - } - } - - private static class TableRefToProjectId - implements SerializableFunction<TableReference, String> { - @Override - public String apply(TableReference from) { - return from.getProjectId(); - } - } - - @VisibleForTesting - static class TableSpecToTableRef - implements SerializableFunction<String, TableReference> { - @Override - public TableReference apply(String from) { - return parseTableSpec(from); - } - } - - @VisibleForTesting - static class BeamJobUuidToBigQueryJobUuid - implements SerializableFunction<String, String> { - @Override - public String apply(String from) { - return "beam_job_" + from; - } - } - - @VisibleForTesting - static class CreatePerBeamJobUuid - implements SerializableFunction<String, String> { - private final String stepUuid; - - private CreatePerBeamJobUuid(String stepUuid) { - this.stepUuid = stepUuid; - } - - @Override - public String apply(String jobUuid) { - return stepUuid + "_" + jobUuid.replaceAll("-", ""); - } - } - - @VisibleForTesting - static class CreateJsonTableRefFromUuid - implements SerializableFunction<String, TableReference> { - private final String executingProject; - - private CreateJsonTableRefFromUuid(String executingProject) { - this.executingProject = executingProject; - } - - @Override - public TableReference apply(String jobUuid) { - String queryTempDatasetId = "temp_dataset_" + jobUuid; - String queryTempTableId = "temp_table_" + jobUuid; - TableReference queryTempTableRef = new TableReference() - .setProjectId(executingProject) - .setDatasetId(queryTempDatasetId) - .setTableId(queryTempTableId); - return queryTempTableRef; - } - } - - @Nullable - private static ValueProvider<String> displayTable( - @Nullable ValueProvider<TableReference> table) { - if (table == null) { - return null; - } - return NestedValueProvider.of(table, new TableRefToTableSpec()); - } - /** * A formatting function that maps a TableRow to itself. This allows sending a @@ -556,7 +408,7 @@ public class BigQueryIO { * Read from table specified by a {@link TableReference}. */ public Read from(TableReference table) { - return from(StaticValueProvider.of(toTableSpec(table))); + return from(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table))); } private static final String QUERY_VALIDATION_FAILURE_ERROR = @@ -672,7 +524,7 @@ public class BigQueryIO { @Override public PCollection<TableRow> expand(PBegin input) { - String stepUuid = randomUUIDString(); + String stepUuid = BigQueryHelpers.randomUUIDString(); BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); ValueProvider<String> jobUuid = NestedValueProvider.of( StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid)); @@ -752,7 +604,7 @@ public class BigQueryIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder - .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider())) + .addIfNotNull(DisplayData.item("table", BigQueryHelpers.displayTable(getTableProvider())) .withLabel("Table")) .addIfNotNull(DisplayData.item("query", getQuery()) .withLabel("Query")) @@ -787,7 +639,7 @@ public class BigQueryIO { TableReference tableRef = table.get(); tableRef.setProjectId(bqOptions.getProject()); return NestedValueProvider.of(StaticValueProvider.of( - toJsonString(tableRef)), new JsonTableRefToTableRef()); + BigQueryHelpers.toJsonString(tableRef)), new JsonTableRefToTableRef()); } return table; } @@ -810,541 +662,15 @@ public class BigQueryIO { } } - /** - * A {@link PTransform} that invokes {@link CleanupOperation} after the input {@link PCollection} - * has been processed. - */ - @VisibleForTesting - static class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T>> { - - private CleanupOperation cleanupOperation; - - PassThroughThenCleanup(CleanupOperation cleanupOperation) { - this.cleanupOperation = cleanupOperation; - } - - @Override - public PCollection<T> expand(PCollection<T> input) { - TupleTag<T> mainOutput = new TupleTag<>(); - TupleTag<Void> cleanupSignal = new TupleTag<>(); - PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>()) - .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal))); - - PCollectionView<Void> cleanupSignalView = outputs.get(cleanupSignal) - .setCoder(VoidCoder.of()) - .apply(View.<Void>asSingleton().withDefaultValue(null)); - - input.getPipeline() - .apply("Create(CleanupOperation)", Create.of(cleanupOperation)) - .apply("Cleanup", ParDo.of( - new DoFn<CleanupOperation, Void>() { - @ProcessElement - public void processElement(ProcessContext c) - throws Exception { - c.element().cleanup(c.getPipelineOptions()); - } - }).withSideInputs(cleanupSignalView)); - - return outputs.get(mainOutput); - } - - private static class IdentityFn<T> extends DoFn<T, T> { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element()); - } - } - - abstract static class CleanupOperation implements Serializable { - abstract void cleanup(PipelineOptions options) throws Exception; - } - } - - /** - * A {@link BigQuerySourceBase} for reading BigQuery tables. - */ - @VisibleForTesting - static class BigQueryTableSource extends BigQuerySourceBase { - - static BigQueryTableSource create( - ValueProvider<String> jobIdToken, - ValueProvider<TableReference> table, - String extractDestinationDir, - BigQueryServices bqServices, - ValueProvider<String> executingProject) { - return new BigQueryTableSource( - jobIdToken, table, extractDestinationDir, bqServices, executingProject); - } - - private final ValueProvider<String> jsonTable; - private final AtomicReference<Long> tableSizeBytes; - - private BigQueryTableSource( - ValueProvider<String> jobIdToken, - ValueProvider<TableReference> table, - String extractDestinationDir, - BigQueryServices bqServices, - ValueProvider<String> executingProject) { - super(jobIdToken, extractDestinationDir, bqServices, executingProject); - this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson()); - this.tableSizeBytes = new AtomicReference<>(); - } - - @Override - protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException { - checkState(jsonTable.isAccessible()); - return JSON_FACTORY.fromString(jsonTable.get(), TableReference.class); - } - - @Override - public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException { - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - checkState(jsonTable.isAccessible()); - TableReference tableRef = JSON_FACTORY.fromString(jsonTable.get(), TableReference.class); - return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef)); - } - - @Override - public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - if (tableSizeBytes.get() == null) { - TableReference table = JSON_FACTORY.fromString(jsonTable.get(), TableReference.class); - - Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class)) - .getTable(table).getNumBytes(); - tableSizeBytes.compareAndSet(null, numBytes); - } - return tableSizeBytes.get(); - } - - @Override - protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { - // Do nothing. - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("table", jsonTable)); - } - } - - /** - * A {@link BigQuerySourceBase} for querying BigQuery tables. - */ - @VisibleForTesting - static class BigQueryQuerySource extends BigQuerySourceBase { - - static BigQueryQuerySource create( - ValueProvider<String> jobIdToken, - ValueProvider<String> query, - ValueProvider<TableReference> queryTempTableRef, - Boolean flattenResults, - Boolean useLegacySql, - String extractDestinationDir, - BigQueryServices bqServices) { - return new BigQueryQuerySource( - jobIdToken, - query, - queryTempTableRef, - flattenResults, - useLegacySql, - extractDestinationDir, - bqServices); - } - - private final ValueProvider<String> query; - private final ValueProvider<String> jsonQueryTempTable; - private final Boolean flattenResults; - private final Boolean useLegacySql; - private transient AtomicReference<JobStatistics> dryRunJobStats; - - private BigQueryQuerySource( - ValueProvider<String> jobIdToken, - ValueProvider<String> query, - ValueProvider<TableReference> queryTempTableRef, - Boolean flattenResults, - Boolean useLegacySql, - String extractDestinationDir, - BigQueryServices bqServices) { - super(jobIdToken, extractDestinationDir, bqServices, - NestedValueProvider.of( - checkNotNull(queryTempTableRef, "queryTempTableRef"), new TableRefToProjectId())); - this.query = checkNotNull(query, "query"); - this.jsonQueryTempTable = NestedValueProvider.of( - queryTempTableRef, new TableRefToJson()); - this.flattenResults = checkNotNull(flattenResults, "flattenResults"); - this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql"); - this.dryRunJobStats = new AtomicReference<>(); - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - return dryRunQueryIfNeeded(bqOptions).getTotalBytesProcessed(); - } - - @Override - public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException { - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - return new BigQueryReader(this, bqServices.getReaderFromQuery( - bqOptions, executingProject.get(), createBasicQueryConfig())); - } - - @Override - protected TableReference getTableToExtract(BigQueryOptions bqOptions) - throws IOException, InterruptedException { - // 1. Find the location of the query. - String location = null; - List<TableReference> referencedTables = - dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables(); - DatasetService tableService = bqServices.getDatasetService(bqOptions); - if (referencedTables != null && !referencedTables.isEmpty()) { - TableReference queryTable = referencedTables.get(0); - location = tableService.getTable(queryTable).getLocation(); - } - - // 2. Create the temporary dataset in the query location. - TableReference tableToExtract = - JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class); - tableService.createDataset( - tableToExtract.getProjectId(), - tableToExtract.getDatasetId(), - location, - "Dataset for BigQuery query job temporary table"); - - // 3. Execute the query. - String queryJobId = jobIdToken.get() + "-query"; - executeQuery( - executingProject.get(), - queryJobId, - tableToExtract, - bqServices.getJobService(bqOptions)); - return tableToExtract; - } - - @Override - protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { - checkState(jsonQueryTempTable.isAccessible()); - TableReference tableToRemove = - JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class); - - DatasetService tableService = bqServices.getDatasetService(bqOptions); - tableService.deleteTable(tableToRemove); - tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId()); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("query", query)); - } - - private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) - throws InterruptedException, IOException { - if (dryRunJobStats.get() == null) { - JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery( - executingProject.get(), createBasicQueryConfig()); - dryRunJobStats.compareAndSet(null, jobStats); - } - return dryRunJobStats.get(); - } - - private void executeQuery( - String executingProject, - String jobId, - TableReference destinationTable, - JobService jobService) throws IOException, InterruptedException { - JobReference jobRef = new JobReference() - .setProjectId(executingProject) - .setJobId(jobId); - - JobConfigurationQuery queryConfig = createBasicQueryConfig() - .setAllowLargeResults(true) - .setCreateDisposition("CREATE_IF_NEEDED") - .setDestinationTable(destinationTable) - .setPriority("BATCH") - .setWriteDisposition("WRITE_EMPTY"); - - jobService.startQueryJob(jobRef, queryConfig); - Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); - if (parseStatus(job) != Status.SUCCEEDED) { - throw new IOException(String.format( - "Query job %s failed, status: %s.", jobId, statusToPrettyString(job.getStatus()))); - } - } - - private JobConfigurationQuery createBasicQueryConfig() { - return new JobConfigurationQuery() - .setFlattenResults(flattenResults) - .setQuery(query.get()) - .setUseLegacySql(useLegacySql); - } - - private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { - in.defaultReadObject(); - dryRunJobStats = new AtomicReference<>(); - } - } - - /** - * An abstract {@link BoundedSource} to read a table from BigQuery. - * - * <p>This source uses a BigQuery export job to take a snapshot of the table on GCS, and then - * reads in parallel from each produced file. It is implemented by {@link BigQueryTableSource}, - * and {@link BigQueryQuerySource}, depending on the configuration of the read. - * Specifically, - * <ul> - * <li>{@link BigQueryTableSource} is for reading BigQuery tables</li> - * <li>{@link BigQueryQuerySource} is for querying BigQuery tables</li> - * </ul> - * ... - */ - private abstract static class BigQuerySourceBase extends BoundedSource<TableRow> { - // The maximum number of retries to poll a BigQuery job. - protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; - - protected final ValueProvider<String> jobIdToken; - protected final String extractDestinationDir; - protected final BigQueryServices bqServices; - protected final ValueProvider<String> executingProject; - - private BigQuerySourceBase( - ValueProvider<String> jobIdToken, - String extractDestinationDir, - BigQueryServices bqServices, - ValueProvider<String> executingProject) { - this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken"); - this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir"); - this.bqServices = checkNotNull(bqServices, "bqServices"); - this.executingProject = checkNotNull(executingProject, "executingProject"); - } - - @Override - public List<BoundedSource<TableRow>> splitIntoBundles( - long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - TableReference tableToExtract = getTableToExtract(bqOptions); - JobService jobService = bqServices.getJobService(bqOptions); - String extractJobId = getExtractJobId(jobIdToken); - List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService); - - TableSchema tableSchema = bqServices.getDatasetService(bqOptions) - .getTable(tableToExtract).getSchema(); - - cleanupTempResource(bqOptions); - return createSources(tempFiles, tableSchema); - } - - protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception; - - protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception; - - @Override - public void validate() { - // Do nothing, validation is done in BigQuery.Read. - } - - @Override - public Coder<TableRow> getDefaultOutputCoder() { - return TableRowJsonCoder.of(); - } - - private List<String> executeExtract( - String jobId, TableReference table, JobService jobService) - throws InterruptedException, IOException { - JobReference jobRef = new JobReference() - .setProjectId(executingProject.get()) - .setJobId(jobId); - - String destinationUri = getExtractDestinationUri(extractDestinationDir); - JobConfigurationExtract extract = new JobConfigurationExtract() - .setSourceTable(table) - .setDestinationFormat("AVRO") - .setDestinationUris(ImmutableList.of(destinationUri)); - - LOG.info("Starting BigQuery extract job: {}", jobId); - jobService.startExtractJob(jobRef, extract); - Job extractJob = - jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); - if (parseStatus(extractJob) != Status.SUCCEEDED) { - throw new IOException(String.format( - "Extract job %s failed, status: %s.", - extractJob.getJobReference().getJobId(), statusToPrettyString(extractJob.getStatus()))); - } - - List<String> tempFiles = getExtractFilePaths(extractDestinationDir, extractJob); - return ImmutableList.copyOf(tempFiles); - } - - private List<BoundedSource<TableRow>> createSources( - List<String> files, TableSchema tableSchema) throws IOException, InterruptedException { - final String jsonSchema = JSON_FACTORY.toString(tableSchema); - - SerializableFunction<GenericRecord, TableRow> function = - new SerializableFunction<GenericRecord, TableRow>() { - @Override - public TableRow apply(GenericRecord input) { - return BigQueryAvroUtils.convertGenericRecordToTableRow( - input, fromJsonString(jsonSchema, TableSchema.class)); - }}; - - List<BoundedSource<TableRow>> avroSources = Lists.newArrayList(); - for (String fileName : files) { - avroSources.add(new TransformingSource<>( - AvroSource.from(fileName), function, getDefaultOutputCoder())); - } - return ImmutableList.copyOf(avroSources); - } - - protected static class BigQueryReader extends BoundedSource.BoundedReader<TableRow> { - private final BigQuerySourceBase source; - private final BigQueryServices.BigQueryJsonReader reader; - - private BigQueryReader( - BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) { - this.source = source; - this.reader = reader; - } - - @Override - public BoundedSource<TableRow> getCurrentSource() { - return source; - } - - @Override - public boolean start() throws IOException { - return reader.start(); - } - - @Override - public boolean advance() throws IOException { - return reader.advance(); - } - - @Override - public TableRow getCurrent() throws NoSuchElementException { - return reader.getCurrent(); - } - - @Override - public void close() throws IOException { - reader.close(); - } - } - } - - /** - * A {@link BoundedSource} that reads from {@code BoundedSource<T>} - * and transforms elements to type {@code V}. - */ - @VisibleForTesting - static class TransformingSource<T, V> extends BoundedSource<V> { - private final BoundedSource<T> boundedSource; - private final SerializableFunction<T, V> function; - private final Coder<V> outputCoder; - - TransformingSource( - BoundedSource<T> boundedSource, - SerializableFunction<T, V> function, - Coder<V> outputCoder) { - this.boundedSource = checkNotNull(boundedSource, "boundedSource"); - this.function = checkNotNull(function, "function"); - this.outputCoder = checkNotNull(outputCoder, "outputCoder"); - } - - @Override - public List<? extends BoundedSource<V>> splitIntoBundles( - long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - return Lists.transform( - boundedSource.splitIntoBundles(desiredBundleSizeBytes, options), - new Function<BoundedSource<T>, BoundedSource<V>>() { - @Override - public BoundedSource<V> apply(BoundedSource<T> input) { - return new TransformingSource<>(input, function, outputCoder); - } - }); - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - return boundedSource.getEstimatedSizeBytes(options); - } - - @Override - public BoundedReader<V> createReader(PipelineOptions options) throws IOException { - return new TransformingReader(boundedSource.createReader(options)); - } - - @Override - public void validate() { - boundedSource.validate(); - } - - @Override - public Coder<V> getDefaultOutputCoder() { - return outputCoder; - } - - private class TransformingReader extends BoundedReader<V> { - private final BoundedReader<T> boundedReader; - - private TransformingReader(BoundedReader<T> boundedReader) { - this.boundedReader = checkNotNull(boundedReader, "boundedReader"); - } - - @Override - public synchronized BoundedSource<V> getCurrentSource() { - return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder); - } - - @Override - public boolean start() throws IOException { - return boundedReader.start(); - } - - @Override - public boolean advance() throws IOException { - return boundedReader.advance(); - } - - @Override - public V getCurrent() throws NoSuchElementException { - T current = boundedReader.getCurrent(); - return function.apply(current); - } - - @Override - public void close() throws IOException { - boundedReader.close(); - } - - @Override - public synchronized BoundedSource<V> splitAtFraction(double fraction) { - BoundedSource<T> split = boundedReader.splitAtFraction(fraction); - return split == null ? null : new TransformingSource<>(split, function, outputCoder); - } - - @Override - public Double getFractionConsumed() { - return boundedReader.getFractionConsumed(); - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return boundedReader.getCurrentTimestamp(); - } - } - } - - private static String getExtractJobId(ValueProvider<String> jobIdToken) { + static String getExtractJobId(ValueProvider<String> jobIdToken) { return jobIdToken.get() + "-extract"; } - private static String getExtractDestinationUri(String extractDestinationDir) { + static String getExtractDestinationUri(String extractDestinationDir) { return String.format("%s/%s", extractDestinationDir, "*.avro"); } - private static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob) + static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob) throws IOException { JobStatistics jobStats = extractJob.getStatistics(); List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts(); @@ -1538,7 +864,8 @@ public class BigQueryIO { } /** - * Writes to the given table, specified in the format described in {@link #parseTableSpec}. + * Writes to the given table, specified in the format described in + * {@link BigQueryHelpers#parseTableSpec}. */ public Write<T> to(String tableSpec) { return to(StaticValueProvider.of(tableSpec)); @@ -1546,7 +873,7 @@ public class BigQueryIO { /** Writes to the given table, specified as a {@link TableReference}. */ public Write<T> to(TableReference table) { - return to(StaticValueProvider.of(toTableSpec(table))); + return to(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table))); } /** Same as {@link #to(String)}, but with a {@link ValueProvider}. */ @@ -1596,7 +923,7 @@ public class BigQueryIO { @Override public TableReference apply(ValueInSingleWindow<T> value) { - return parseTableSpec(tableSpecFunction.apply(value)); + return BigQueryHelpers.parseTableSpec(tableSpecFunction.apply(value)); } } @@ -1609,7 +936,7 @@ public class BigQueryIO { */ public Write<T> withSchema(TableSchema schema) { return toBuilder() - .setJsonSchema(StaticValueProvider.of(toJsonString(schema))) + .setJsonSchema(StaticValueProvider.of(BigQueryHelpers.toJsonString(schema))) .build(); } @@ -1655,7 +982,7 @@ public class BigQueryIO { checkState( datasetService.isTableEmpty(tableRef), "BigQuery table is not empty: %s.", - BigQueryIO.toTableSpec(tableRef)); + BigQueryHelpers.toTableSpec(tableRef)); } } catch (IOException | InterruptedException e) { if (e instanceof InterruptedException) { @@ -1663,7 +990,7 @@ public class BigQueryIO { } throw new RuntimeException( "unable to confirm BigQuery table emptiness for table " - + BigQueryIO.toTableSpec(tableRef), e); + + BigQueryHelpers.toTableSpec(tableRef), e); } } @@ -1758,7 +1085,7 @@ public class BigQueryIO { ValueProvider<TableReference> table = getTableWithDefaultProject(options); - final String stepUuid = randomUUIDString(); + final String stepUuid = BigQueryHelpers.randomUUIDString(); String tempLocation = options.getTempLocation(); String tempFilePrefix; @@ -1953,7 +1280,7 @@ public class BigQueryIO { /** Returns the table schema. */ public TableSchema getSchema() { - return fromJsonString( + return BigQueryHelpers.fromJsonString( getJsonSchema() == null ? null : getJsonSchema().get(), TableSchema.class); } @@ -1979,7 +1306,7 @@ public class BigQueryIO { TableReference tableRef = table.get(); tableRef.setProjectId(bqOptions.getProject()); return NestedValueProvider.of(StaticValueProvider.of( - toJsonString(tableRef)), new JsonTableRefToTableRef()); + BigQueryHelpers.toJsonString(tableRef)), new JsonTableRefToTableRef()); } return table; } @@ -2128,7 +1455,8 @@ public class BigQueryIO { List<String> partition = Lists.newArrayList(c.element().getValue()).get(0); String jobIdPrefix = String.format( c.sideInput(jobIdToken) + "_%05d", c.element().getKey()); - TableReference ref = fromJsonString(jsonTableRef.get(), TableReference.class); + TableReference ref = BigQueryHelpers.fromJsonString(jsonTableRef.get(), + TableReference.class); if (!singlePartition) { ref.setTableId(jobIdPrefix); } @@ -2138,13 +1466,13 @@ public class BigQueryIO { bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdPrefix, ref, - fromJsonString( + BigQueryHelpers.fromJsonString( jsonSchema == null ? null : jsonSchema.get(), TableSchema.class), partition, writeDisposition, createDisposition, tableDescription); - c.output(toJsonString(ref)); + c.output(BigQueryHelpers.toJsonString(ref)); removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition); } @@ -2176,7 +1504,7 @@ public class BigQueryIO { .setJobId(jobId); jobService.startLoadJob(jobRef, loadConfig); Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES); - Status jobStatus = parseStatus(loadJob); + Status jobStatus = BigQueryHelpers.parseStatus(loadJob); switch (jobStatus) { case SUCCEEDED: if (tableDescription != null) { @@ -2185,14 +1513,15 @@ public class BigQueryIO { return; case UNKNOWN: throw new RuntimeException(String.format( - "UNKNOWN status of load job [%s]: %s.", jobId, jobToPrettyString(loadJob))); + "UNKNOWN status of load job [%s]: %s.", jobId, + BigQueryHelpers.jobToPrettyString(loadJob))); case FAILED: lastFailedLoadJob = loadJob; continue; default: throw new IllegalStateException(String.format( "Unexpected status [%s] of load job: %s.", - jobStatus, jobToPrettyString(loadJob))); + jobStatus, BigQueryHelpers.jobToPrettyString(loadJob))); } } throw new RuntimeException(String.format( @@ -2200,7 +1529,7 @@ public class BigQueryIO { + "reached max retries: %d, last failed load job: %s.", jobIdPrefix, Write.MAX_RETRY_JOBS, - jobToPrettyString(lastFailedLoadJob))); + BigQueryHelpers.jobToPrettyString(lastFailedLoadJob))); } static void removeTemporaryFiles( @@ -2281,13 +1610,13 @@ public class BigQueryIO { List<TableReference> tempTables = Lists.newArrayList(); for (String table : tempTablesJson) { - tempTables.add(fromJsonString(table, TableReference.class)); + tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class)); } copy( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), c.sideInput(jobIdToken), - fromJsonString(jsonTableRef.get(), TableReference.class), + BigQueryHelpers.fromJsonString(jsonTableRef.get(), TableReference.class), tempTables, writeDisposition, createDisposition, @@ -2322,7 +1651,7 @@ public class BigQueryIO { .setJobId(jobId); jobService.startCopyJob(jobRef, copyConfig); Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES); - Status jobStatus = parseStatus(copyJob); + Status jobStatus = BigQueryHelpers.parseStatus(copyJob); switch (jobStatus) { case SUCCEEDED: if (tableDescription != null) { @@ -2331,14 +1660,15 @@ public class BigQueryIO { return; case UNKNOWN: throw new RuntimeException(String.format( - "UNKNOWN status of copy job [%s]: %s.", jobId, jobToPrettyString(copyJob))); + "UNKNOWN status of copy job [%s]: %s.", jobId, + BigQueryHelpers.jobToPrettyString(copyJob))); case FAILED: lastFailedCopyJob = copyJob; continue; default: throw new IllegalStateException(String.format( "Unexpected status [%s] of load job: %s.", - jobStatus, jobToPrettyString(copyJob))); + jobStatus, BigQueryHelpers.jobToPrettyString(copyJob))); } } throw new RuntimeException(String.format( @@ -2346,17 +1676,17 @@ public class BigQueryIO { + "reached max retries: %d, last failed copy job: %s.", jobIdPrefix, Write.MAX_RETRY_JOBS, - jobToPrettyString(lastFailedCopyJob))); + BigQueryHelpers.jobToPrettyString(lastFailedCopyJob))); } static void removeTemporaryTables(DatasetService tableService, List<TableReference> tempTables) { for (TableReference tableRef : tempTables) { try { - LOG.debug("Deleting table {}", toJsonString(tableRef)); + LOG.debug("Deleting table {}", BigQueryHelpers.toJsonString(tableRef)); tableService.deleteTable(tableRef); } catch (Exception e) { - LOG.warn("Failed to delete the table {}", toJsonString(tableRef), e); + LOG.warn("Failed to delete the table {}", BigQueryHelpers.toJsonString(tableRef), e); } } } @@ -2376,14 +1706,6 @@ public class BigQueryIO { } } - private static String jobToPrettyString(@Nullable Job job) throws IOException { - return job == null ? "null" : job.toPrettyString(); - } - - private static String statusToPrettyString(@Nullable JobStatus status) throws IOException { - return status == null ? "Unknown status: null." : status.toPrettyString(); - } - private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) { try { datasetService.getDataset(table.getProjectId(), table.getDatasetId()); @@ -2391,14 +1713,14 @@ public class BigQueryIO { ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { throw new IllegalArgumentException( - String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table)), + String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryHelpers.toTableSpec(table)), e); } else if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { throw new RuntimeException( String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", - BigQueryIO.toTableSpec(table)), + BigQueryHelpers.toTableSpec(table)), e); } } @@ -2411,13 +1733,14 @@ public class BigQueryIO { ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { throw new IllegalArgumentException( - String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table)), e); + String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryHelpers.toTableSpec(table)), + e); } else if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { throw new RuntimeException( String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table", - BigQueryIO.toTableSpec(table)), + BigQueryHelpers.toTableSpec(table)), e); } } @@ -2492,8 +1815,9 @@ public class BigQueryIO { @ProcessElement public void processElement(ProcessContext context) { String tableSpec = context.element().getKey().getKey(); - List<TableRow> rows = getOrCreateMapListValue(tableRows, tableSpec); - List<String> uniqueIds = getOrCreateMapListValue(uniqueIdsForTableRows, tableSpec); + List<TableRow> rows = BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec); + List<String> uniqueIds = BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows, + tableSpec); rows.add(context.element().getValue().tableRow); uniqueIds.add(context.element().getValue().uniqueId); @@ -2526,7 +1850,7 @@ public class BigQueryIO { public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec) throws InterruptedException, IOException { - TableReference tableReference = parseTableSpec(tableSpec); + TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec); if (createDisposition != createDisposition.CREATE_NEVER && !createdTables.contains(tableSpec)) { synchronized (createdTables) { @@ -2723,7 +2047,7 @@ public class BigQueryIO { TableReference tableRef = table.get() .setProjectId(options.as(BigQueryOptions.class).getProject()); table = NestedValueProvider.of( - StaticValueProvider.of(toJsonString(tableRef)), + StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)), new JsonTableRefToTableRef()); } this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec()); @@ -2779,7 +2103,7 @@ public class BigQueryIO { if (table.getProjectId() == null) { table.setProjectId(options.getProject()); } - return toTableSpec(table); + return BigQueryHelpers.toTableSpec(table); } } } @@ -2858,68 +2182,9 @@ public class BigQueryIO { UNKNOWN, } - private static Status parseStatus(@Nullable Job job) { - if (job == null) { - return Status.UNKNOWN; - } - JobStatus status = job.getStatus(); - if (status.getErrorResult() != null) { - return Status.FAILED; - } else if (status.getErrors() != null && !status.getErrors().isEmpty()) { - return Status.FAILED; - } else { - return Status.SUCCEEDED; - } - } - - @VisibleForTesting - static String toJsonString(Object item) { - if (item == null) { - return null; - } - try { - return JSON_FACTORY.toString(item); - } catch (IOException e) { - throw new RuntimeException( - String.format("Cannot serialize %s to a JSON string.", item.getClass().getSimpleName()), - e); - } - } - - @VisibleForTesting - static <T> T fromJsonString(String json, Class<T> clazz) { - if (json == null) { - return null; - } - try { - return JSON_FACTORY.fromString(json, clazz); - } catch (IOException e) { - throw new RuntimeException( - String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json), - e); - } - } - - /** - * Returns a randomUUID string. - * - * <p>{@code '-'} is removed because BigQuery doesn't allow it in dataset id. - */ - private static String randomUUIDString() { - return UUID.randomUUID().toString().replaceAll("-", ""); - } - ///////////////////////////////////////////////////////////////////////////// /** Disallow construction of utility class. */ private BigQueryIO() {} - private static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) { - List<V> value = map.get(key); - if (value == null) { - value = new ArrayList<>(); - map.put(key, value); - } - return value; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java new file mode 100644 index 0000000..a909957 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java @@ -0,0 +1,186 @@ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToProjectId; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.display.DisplayData; + + +/** + * A {@link BigQuerySourceBase} for querying BigQuery tables. + */ +@VisibleForTesting +class BigQueryQuerySource extends BigQuerySourceBase { + + static BigQueryQuerySource create( + ValueProvider<String> jobIdToken, + ValueProvider<String> query, + ValueProvider<TableReference> queryTempTableRef, + Boolean flattenResults, + Boolean useLegacySql, + String extractDestinationDir, + BigQueryServices bqServices) { + return new BigQueryQuerySource( + jobIdToken, + query, + queryTempTableRef, + flattenResults, + useLegacySql, + extractDestinationDir, + bqServices); + } + + private final ValueProvider<String> query; + private final ValueProvider<String> jsonQueryTempTable; + private final Boolean flattenResults; + private final Boolean useLegacySql; + private transient AtomicReference<JobStatistics> dryRunJobStats; + + private BigQueryQuerySource( + ValueProvider<String> jobIdToken, + ValueProvider<String> query, + ValueProvider<TableReference> queryTempTableRef, + Boolean flattenResults, + Boolean useLegacySql, + String extractDestinationDir, + BigQueryServices bqServices) { + super(jobIdToken, extractDestinationDir, bqServices, + NestedValueProvider.of( + checkNotNull(queryTempTableRef, "queryTempTableRef"), new TableRefToProjectId())); + this.query = checkNotNull(query, "query"); + this.jsonQueryTempTable = NestedValueProvider.of( + queryTempTableRef, new TableRefToJson()); + this.flattenResults = checkNotNull(flattenResults, "flattenResults"); + this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql"); + this.dryRunJobStats = new AtomicReference<>(); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + return dryRunQueryIfNeeded(bqOptions).getTotalBytesProcessed(); + } + + @Override + public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + return new BigQueryReader(this, bqServices.getReaderFromQuery( + bqOptions, executingProject.get(), createBasicQueryConfig())); + } + + @Override + protected TableReference getTableToExtract(BigQueryOptions bqOptions) + throws IOException, InterruptedException { + // 1. Find the location of the query. + String location = null; + List<TableReference> referencedTables = + dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables(); + DatasetService tableService = bqServices.getDatasetService(bqOptions); + if (referencedTables != null && !referencedTables.isEmpty()) { + TableReference queryTable = referencedTables.get(0); + location = tableService.getTable(queryTable).getLocation(); + } + + // 2. Create the temporary dataset in the query location. + TableReference tableToExtract = + BigQueryIO.JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class); + tableService.createDataset( + tableToExtract.getProjectId(), + tableToExtract.getDatasetId(), + location, + "Dataset for BigQuery query job temporary table"); + + // 3. Execute the query. + String queryJobId = jobIdToken.get() + "-query"; + executeQuery( + executingProject.get(), + queryJobId, + tableToExtract, + bqServices.getJobService(bqOptions)); + return tableToExtract; + } + + @Override + protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { + checkState(jsonQueryTempTable.isAccessible()); + TableReference tableToRemove = + BigQueryIO.JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class); + + DatasetService tableService = bqServices.getDatasetService(bqOptions); + tableService.deleteTable(tableToRemove); + tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("query", query)); + } + + private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) + throws InterruptedException, IOException { + if (dryRunJobStats.get() == null) { + JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery( + executingProject.get(), createBasicQueryConfig()); + dryRunJobStats.compareAndSet(null, jobStats); + } + return dryRunJobStats.get(); + } + + private void executeQuery( + String executingProject, + String jobId, + TableReference destinationTable, + JobService jobService) throws IOException, InterruptedException { + JobReference jobRef = new JobReference() + .setProjectId(executingProject) + .setJobId(jobId); + + JobConfigurationQuery queryConfig = createBasicQueryConfig() + .setAllowLargeResults(true) + .setCreateDisposition("CREATE_IF_NEEDED") + .setDestinationTable(destinationTable) + .setPriority("BATCH") + .setWriteDisposition("WRITE_EMPTY"); + + jobService.startQueryJob(jobRef, queryConfig); + Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); + if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) { + throw new IOException(String.format( + "Query job %s failed, status: %s.", jobId, + BigQueryHelpers.statusToPrettyString(job.getStatus()))); + } + } + + private JobConfigurationQuery createBasicQueryConfig() { + return new JobConfigurationQuery() + .setFlattenResults(flattenResults) + .setQuery(query.get()) + .setUseLegacySql(useLegacySql); + } + + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { + in.defaultReadObject(); + dryRunJobStats = new AtomicReference<>(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 15ca262..c8e6ed8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -444,7 +444,7 @@ class BigQueryServicesImpl implements BigQueryServices { @Override public void createTable(Table table) throws InterruptedException, IOException { LOG.info("Trying to create BigQuery table: {}", - BigQueryIO.toTableSpec(table.getTableReference())); + BigQueryHelpers.toTableSpec(table.getTableReference())); BackOff backoff = new ExponentialBackOff.Builder() .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS) http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java new file mode 100644 index 0000000..ff50e6d --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -0,0 +1,178 @@ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationExtract; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.TableRowJsonCoder; +import org.apache.beam.sdk.io.AvroSource; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract {@link BoundedSource} to read a table from BigQuery. + * + * <p>This source uses a BigQuery export job to take a snapshot of the table on GCS, and then + * reads in parallel from each produced file. It is implemented by {@link BigQueryTableSource}, + * and {@link BigQueryQuerySource}, depending on the configuration of the read. + * Specifically, + * <ul> + * <li>{@link BigQueryTableSource} is for reading BigQuery tables</li> + * <li>{@link BigQueryQuerySource} is for querying BigQuery tables</li> + * </ul> + * ... + */ +abstract class BigQuerySourceBase extends BoundedSource<TableRow> { + private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceBase.class); + + // The maximum number of retries to poll a BigQuery job. + protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; + + protected final ValueProvider<String> jobIdToken; + protected final String extractDestinationDir; + protected final BigQueryServices bqServices; + protected final ValueProvider<String> executingProject; + + BigQuerySourceBase( + ValueProvider<String> jobIdToken, + String extractDestinationDir, + BigQueryServices bqServices, + ValueProvider<String> executingProject) { + this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken"); + this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir"); + this.bqServices = checkNotNull(bqServices, "bqServices"); + this.executingProject = checkNotNull(executingProject, "executingProject"); + } + + @Override + public List<BoundedSource<TableRow>> splitIntoBundles( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + TableReference tableToExtract = getTableToExtract(bqOptions); + JobService jobService = bqServices.getJobService(bqOptions); + String extractJobId = BigQueryIO.getExtractJobId(jobIdToken); + List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService); + + TableSchema tableSchema = bqServices.getDatasetService(bqOptions) + .getTable(tableToExtract).getSchema(); + + cleanupTempResource(bqOptions); + return createSources(tempFiles, tableSchema); + } + + protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception; + + protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception; + + @Override + public void validate() { + // Do nothing, validation is done in BigQuery.Read. + } + + @Override + public Coder<TableRow> getDefaultOutputCoder() { + return TableRowJsonCoder.of(); + } + + private List<String> executeExtract( + String jobId, TableReference table, JobService jobService) + throws InterruptedException, IOException { + JobReference jobRef = new JobReference() + .setProjectId(executingProject.get()) + .setJobId(jobId); + + String destinationUri = BigQueryIO.getExtractDestinationUri(extractDestinationDir); + JobConfigurationExtract extract = new JobConfigurationExtract() + .setSourceTable(table) + .setDestinationFormat("AVRO") + .setDestinationUris(ImmutableList.of(destinationUri)); + + LOG.info("Starting BigQuery extract job: {}", jobId); + jobService.startExtractJob(jobRef, extract); + Job extractJob = + jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); + if (BigQueryHelpers.parseStatus(extractJob) != Status.SUCCEEDED) { + throw new IOException(String.format( + "Extract job %s failed, status: %s.", + extractJob.getJobReference().getJobId(), + BigQueryHelpers.statusToPrettyString(extractJob.getStatus()))); + } + + List<String> tempFiles = BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob); + return ImmutableList.copyOf(tempFiles); + } + + private List<BoundedSource<TableRow>> createSources( + List<String> files, TableSchema tableSchema) throws IOException, InterruptedException { + final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(tableSchema); + + SerializableFunction<GenericRecord, TableRow> function = + new SerializableFunction<GenericRecord, TableRow>() { + @Override + public TableRow apply(GenericRecord input) { + return BigQueryAvroUtils.convertGenericRecordToTableRow( + input, BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class)); + }}; + + List<BoundedSource<TableRow>> avroSources = Lists.newArrayList(); + for (String fileName : files) { + avroSources.add(new TransformingSource<>( + AvroSource.from(fileName), function, getDefaultOutputCoder())); + } + return ImmutableList.copyOf(avroSources); + } + + protected static class BigQueryReader extends BoundedReader<TableRow> { + private final BigQuerySourceBase source; + private final BigQueryServices.BigQueryJsonReader reader; + + BigQueryReader( + BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) { + this.source = source; + this.reader = reader; + } + + @Override + public BoundedSource<TableRow> getCurrentSource() { + return source; + } + + @Override + public boolean start() throws IOException { + return reader.start(); + } + + @Override + public boolean advance() throws IOException { + return reader.advance(); + } + + @Override + public TableRow getCurrent() throws NoSuchElementException { + return reader.getCurrent(); + } + + @Override + public void close() throws IOException { + reader.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java new file mode 100644 index 0000000..aae0faa --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java @@ -0,0 +1,86 @@ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.display.DisplayData; + +/** + * A {@link BigQuerySourceBase} for reading BigQuery tables. + */ +@VisibleForTesting +class BigQueryTableSource extends BigQuerySourceBase { + + static BigQueryTableSource create( + ValueProvider<String> jobIdToken, + ValueProvider<TableReference> table, + String extractDestinationDir, + BigQueryServices bqServices, + ValueProvider<String> executingProject) { + return new BigQueryTableSource( + jobIdToken, table, extractDestinationDir, bqServices, executingProject); + } + + private final ValueProvider<String> jsonTable; + private final AtomicReference<Long> tableSizeBytes; + + private BigQueryTableSource( + ValueProvider<String> jobIdToken, + ValueProvider<TableReference> table, + String extractDestinationDir, + BigQueryServices bqServices, + ValueProvider<String> executingProject) { + super(jobIdToken, extractDestinationDir, bqServices, executingProject); + this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson()); + this.tableSizeBytes = new AtomicReference<>(); + } + + @Override + protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException { + checkState(jsonTable.isAccessible()); + return BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class); + } + + @Override + public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + checkState(jsonTable.isAccessible()); + TableReference tableRef = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), + TableReference.class); + return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef)); + } + + @Override + public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + if (tableSizeBytes.get() == null) { + TableReference table = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), + TableReference.class); + + Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class)) + .getTable(table).getNumBytes(); + tableSizeBytes.compareAndSet(null, numBytes); + } + return tableSizeBytes.get(); + } + + @Override + protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { + // Do nothing. + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("table", jsonTable)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java new file mode 100644 index 0000000..612afbe --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java @@ -0,0 +1,66 @@ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.common.annotations.VisibleForTesting; +import java.io.Serializable; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +/** + * A {@link PTransform} that invokes {@link CleanupOperation} after the input {@link PCollection} + * has been processed. + */ +@VisibleForTesting +class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T>> { + + private CleanupOperation cleanupOperation; + + PassThroughThenCleanup(CleanupOperation cleanupOperation) { + this.cleanupOperation = cleanupOperation; + } + + @Override + public PCollection<T> expand(PCollection<T> input) { + TupleTag<T> mainOutput = new TupleTag<>(); + TupleTag<Void> cleanupSignal = new TupleTag<>(); + PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>()) + .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal))); + + PCollectionView<Void> cleanupSignalView = outputs.get(cleanupSignal) + .setCoder(VoidCoder.of()) + .apply(View.<Void>asSingleton().withDefaultValue(null)); + + input.getPipeline() + .apply("Create(CleanupOperation)", Create.of(cleanupOperation)) + .apply("Cleanup", ParDo.of( + new DoFn<CleanupOperation, Void>() { + @ProcessElement + public void processElement(ProcessContext c) + throws Exception { + c.element().cleanup(c.getPipelineOptions()); + } + }).withSideInputs(cleanupSignalView)); + + return outputs.get(mainOutput); + } + + private static class IdentityFn<T> extends DoFn<T, T> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element()); + } + } + + abstract static class CleanupOperation implements Serializable { + abstract void cleanup(PipelineOptions options) throws Exception; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java new file mode 100644 index 0000000..a86adfb --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java @@ -0,0 +1,118 @@ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.joda.time.Instant; + +/** + * A {@link BoundedSource} that reads from {@code BoundedSource<T>} + * and transforms elements to type {@code V}. +*/ +@VisibleForTesting +class TransformingSource<T, V> extends BoundedSource<V> { + private final BoundedSource<T> boundedSource; + private final SerializableFunction<T, V> function; + private final Coder<V> outputCoder; + + TransformingSource( + BoundedSource<T> boundedSource, + SerializableFunction<T, V> function, + Coder<V> outputCoder) { + this.boundedSource = checkNotNull(boundedSource, "boundedSource"); + this.function = checkNotNull(function, "function"); + this.outputCoder = checkNotNull(outputCoder, "outputCoder"); + } + + @Override + public List<? extends BoundedSource<V>> splitIntoBundles( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + return Lists.transform( + boundedSource.splitIntoBundles(desiredBundleSizeBytes, options), + new Function<BoundedSource<T>, BoundedSource<V>>() { + @Override + public BoundedSource<V> apply(BoundedSource<T> input) { + return new TransformingSource<>(input, function, outputCoder); + } + }); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return boundedSource.getEstimatedSizeBytes(options); + } + + @Override + public BoundedReader<V> createReader(PipelineOptions options) throws IOException { + return new TransformingReader(boundedSource.createReader(options)); + } + + @Override + public void validate() { + boundedSource.validate(); + } + + @Override + public Coder<V> getDefaultOutputCoder() { + return outputCoder; + } + + private class TransformingReader extends BoundedReader<V> { + private final BoundedReader<T> boundedReader; + + private TransformingReader(BoundedReader<T> boundedReader) { + this.boundedReader = checkNotNull(boundedReader, "boundedReader"); + } + + @Override + public synchronized BoundedSource<V> getCurrentSource() { + return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder); + } + + @Override + public boolean start() throws IOException { + return boundedReader.start(); + } + + @Override + public boolean advance() throws IOException { + return boundedReader.advance(); + } + + @Override + public V getCurrent() throws NoSuchElementException { + T current = boundedReader.getCurrent(); + return function.apply(current); + } + + @Override + public void close() throws IOException { + boundedReader.close(); + } + + @Override + public synchronized BoundedSource<V> splitAtFraction(double fraction) { + BoundedSource<T> split = boundedReader.splitAtFraction(fraction); + return split == null ? null : new TransformingSource<>(split, function, outputCoder); + } + + @Override + public Double getFractionConsumed() { + return boundedReader.getFractionConsumed(); + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return boundedReader.getCurrentTimestamp(); + } + } +}
