Repository: beam Updated Branches: refs/heads/master 57f449c4c -> 17f0843eb
Remove job name usages from BigQueryIO at pipeline construction time Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ddf8d49 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ddf8d49 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ddf8d49 Branch: refs/heads/master Commit: 0ddf8d49d94288e693494ac0685b0c6df78dcd3b Parents: 57f449c Author: Vikas Kedigehalli <[email protected]> Authored: Tue May 2 13:55:32 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Wed May 3 15:15:02 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 61 ++++------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 32 +++------ .../io/gcp/bigquery/BigQueryQuerySource.java | 40 +++++------- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 24 +++---- .../io/gcp/bigquery/BigQueryTableSource.java | 15 ++--- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 69 +++++++++++--------- .../sdk/io/gcp/bigquery/FakeJobService.java | 5 +- 7 files changed, 94 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index e04361c..3850cbd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -256,15 +256,6 @@ public class BigQueryHelpers { } } - @VisibleForTesting - static class BeamJobUuidToBigQueryJobUuid - implements SerializableFunction<String, String> { - @Override - public String apply(String from) { - return "beam_job_" + from; - } - } - static class TableSchemaToJsonSchema implements SerializableFunction<TableSchema, String> { @Override @@ -297,14 +288,6 @@ public class BigQueryHelpers { } } - static class TableRefToProjectId - implements SerializableFunction<TableReference, String> { - @Override - public String apply(TableReference from) { - return from.getProjectId(); - } - } - @VisibleForTesting static class TableSpecToTableRef implements SerializableFunction<String, TableReference> { @@ -314,39 +297,21 @@ public class BigQueryHelpers { } } - @VisibleForTesting - static class CreatePerBeamJobUuid - implements SerializableFunction<String, String> { - private final String stepUuid; - - CreatePerBeamJobUuid(String stepUuid) { - this.stepUuid = stepUuid; - } - - @Override - public String apply(String jobUuid) { - return stepUuid + "_" + jobUuid.replaceAll("-", ""); - } + static String createJobIdToken(String jobName, String stepUuid) { + return String.format("beam_job_%s_%s", stepUuid, jobName.replaceAll("-", "")); } - @VisibleForTesting - static class CreateJsonTableRefFromUuid - implements SerializableFunction<String, TableReference> { - private final String executingProject; - - CreateJsonTableRefFromUuid(String executingProject) { - this.executingProject = executingProject; - } + static String getExtractJobId(String jobIdToken) { + return String.format("%s-extract", jobIdToken); + } - @Override - public TableReference apply(String jobUuid) { - String queryTempDatasetId = "temp_dataset_" + jobUuid; - String queryTempTableId = "temp_table_" + jobUuid; - TableReference queryTempTableRef = new TableReference() - .setProjectId(executingProject) - .setDatasetId(queryTempDatasetId) - .setTableId(queryTempTableId); - return queryTempTableRef; - } + static TableReference createTempTableReference(String projectId, String jobUuid) { + String queryTempDatasetId = "temp_dataset_" + jobUuid; + String queryTempTableId = "temp_table_" + jobUuid; + TableReference queryTempTableRef = new TableReference() + .setProjectId(projectId) + .setDatasetId(queryTempDatasetId) + .setTableId(queryTempTableId); + return queryTempTableRef; } } http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index ea97906..2ff5cd7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId; import com.google.api.client.json.JsonFactory; import com.google.api.services.bigquery.model.Job; @@ -44,9 +46,6 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.BeamJobUuidToBigQueryJobUuid; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreateJsonTableRefFromUuid; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreatePerBeamJobUuid; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema; @@ -468,15 +467,9 @@ public class BigQueryIO { @Override public PCollection<TableRow> expand(PBegin input) { - String stepUuid = BigQueryHelpers.randomUUIDString(); + final String stepUuid = BigQueryHelpers.randomUUIDString(); BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); - ValueProvider<String> jobUuid = NestedValueProvider.of( - StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid)); - final ValueProvider<String> jobIdToken = NestedValueProvider.of( - jobUuid, new BeamJobUuidToBigQueryJobUuid()); - BoundedSource<TableRow> source; - final String extractDestinationDir; String tempLocation = bqOptions.getTempLocation(); try { @@ -487,15 +480,12 @@ public class BigQueryIO { String.format("Failed to resolve extract destination directory in %s", tempLocation)); } - final String executingProject = bqOptions.getProject(); if (getQuery() != null && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) { source = BigQueryQuerySource.create( - jobIdToken, + stepUuid, getQuery(), - NestedValueProvider.of( - jobUuid, new CreateJsonTableRefFromUuid(executingProject)), getFlattenResults(), getUseLegacySql(), extractDestinationDir, @@ -503,11 +493,10 @@ public class BigQueryIO { } else { source = BigQueryTableSource.create( - jobIdToken, + stepUuid, getTableProvider(), extractDestinationDir, - getBigQueryServices(), - StaticValueProvider.of(executingProject)); + getBigQueryServices()); } PassThroughThenCleanup.CleanupOperation cleanupOperation = new PassThroughThenCleanup.CleanupOperation() { @@ -517,8 +506,9 @@ public class BigQueryIO { JobReference jobRef = new JobReference() - .setProjectId(executingProject) - .setJobId(getExtractJobId(jobIdToken)); + .setProjectId(bqOptions.getProject()) + .setJobId( + getExtractJobId(createJobIdToken(bqOptions.getJobName(), stepUuid))); Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef); @@ -583,10 +573,6 @@ public class BigQueryIO { } } - static String getExtractJobId(ValueProvider<String> jobIdToken) { - return jobIdToken.get() + "-extract"; - } - static String getExtractDestinationUri(String extractDestinationDir) { return String.format("%s/%s", extractDestinationDir, "*.avro"); } http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java index 49da030..205f9cc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java @@ -19,7 +19,8 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationQuery; @@ -34,13 +35,10 @@ import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToProjectId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -51,17 +49,15 @@ import org.apache.beam.sdk.transforms.display.DisplayData; class BigQueryQuerySource extends BigQuerySourceBase { static BigQueryQuerySource create( - ValueProvider<String> jobIdToken, + String stepUuid, ValueProvider<String> query, - ValueProvider<TableReference> queryTempTableRef, Boolean flattenResults, Boolean useLegacySql, String extractDestinationDir, BigQueryServices bqServices) { return new BigQueryQuerySource( - jobIdToken, + stepUuid, query, - queryTempTableRef, flattenResults, useLegacySql, extractDestinationDir, @@ -69,25 +65,19 @@ class BigQueryQuerySource extends BigQuerySourceBase { } private final ValueProvider<String> query; - private final ValueProvider<String> jsonQueryTempTable; private final Boolean flattenResults; private final Boolean useLegacySql; private transient AtomicReference<JobStatistics> dryRunJobStats; private BigQueryQuerySource( - ValueProvider<String> jobIdToken, + String stepUuid, ValueProvider<String> query, - ValueProvider<TableReference> queryTempTableRef, Boolean flattenResults, Boolean useLegacySql, String extractDestinationDir, BigQueryServices bqServices) { - super(jobIdToken, extractDestinationDir, bqServices, - NestedValueProvider.of( - checkNotNull(queryTempTableRef, "queryTempTableRef"), new TableRefToProjectId())); + super(stepUuid, extractDestinationDir, bqServices); this.query = checkNotNull(query, "query"); - this.jsonQueryTempTable = NestedValueProvider.of( - queryTempTableRef, new TableRefToJson()); this.flattenResults = checkNotNull(flattenResults, "flattenResults"); this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql"); this.dryRunJobStats = new AtomicReference<>(); @@ -103,7 +93,7 @@ class BigQueryQuerySource extends BigQuerySourceBase { public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); return new BigQueryReader(this, bqServices.getReaderFromQuery( - bqOptions, executingProject.get(), createBasicQueryConfig())); + bqOptions, bqOptions.getProject(), createBasicQueryConfig())); } @Override @@ -120,8 +110,9 @@ class BigQueryQuerySource extends BigQuerySourceBase { } // 2. Create the temporary dataset in the query location. - TableReference tableToExtract = - BigQueryIO.JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class); + TableReference tableToExtract = createTempTableReference( + bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid)); + tableService.createDataset( tableToExtract.getProjectId(), tableToExtract.getDatasetId(), @@ -129,9 +120,9 @@ class BigQueryQuerySource extends BigQuerySourceBase { "Dataset for BigQuery query job temporary table"); // 3. Execute the query. - String queryJobId = jobIdToken.get() + "-query"; + String queryJobId = createJobIdToken(bqOptions.getJobName(), stepUuid) + "-query"; executeQuery( - executingProject.get(), + bqOptions.getProject(), queryJobId, tableToExtract, bqServices.getJobService(bqOptions)); @@ -140,9 +131,8 @@ class BigQueryQuerySource extends BigQuerySourceBase { @Override protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { - checkState(jsonQueryTempTable.isAccessible()); - TableReference tableToRemove = - BigQueryIO.JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class); + TableReference tableToRemove = createTempTableReference( + bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid)); DatasetService tableService = bqServices.getDatasetService(bqOptions); tableService.deleteTable(tableToRemove); @@ -159,7 +149,7 @@ class BigQueryQuerySource extends BigQuerySourceBase { throws InterruptedException, IOException { if (dryRunJobStats.get() == null) { JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery( - executingProject.get(), createBasicQueryConfig()); + bqOptions.getProject(), createBasicQueryConfig()); dryRunJobStats.compareAndSet(null, jobStats); } return dryRunJobStats.get(); http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index c7a6cca..0171046 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; @@ -38,7 +40,6 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,22 +63,16 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { // The maximum number of retries to poll a BigQuery job. protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; - protected final ValueProvider<String> jobIdToken; + protected final String stepUuid; protected final String extractDestinationDir; protected final BigQueryServices bqServices; - protected final ValueProvider<String> executingProject; private transient List<BoundedSource<TableRow>> cachedSplitResult; - BigQuerySourceBase( - ValueProvider<String> jobIdToken, - String extractDestinationDir, - BigQueryServices bqServices, - ValueProvider<String> executingProject) { - this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken"); + BigQuerySourceBase(String stepUuid, String extractDestinationDir, BigQueryServices bqServices) { + this.stepUuid = checkNotNull(stepUuid, "stepUuid"); this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir"); this.bqServices = checkNotNull(bqServices, "bqServices"); - this.executingProject = checkNotNull(executingProject, "executingProject"); } @Override @@ -91,8 +86,9 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); TableReference tableToExtract = getTableToExtract(bqOptions); JobService jobService = bqServices.getJobService(bqOptions); - String extractJobId = BigQueryIO.getExtractJobId(jobIdToken); - List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService); + String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid)); + List<String> tempFiles = executeExtract( + extractJobId, tableToExtract, jobService, bqOptions.getProject()); TableSchema tableSchema = bqServices.getDatasetService(bqOptions) .getTable(tableToExtract).getSchema(); @@ -118,10 +114,10 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { } private List<String> executeExtract( - String jobId, TableReference table, JobService jobService) + String jobId, TableReference table, JobService jobService, String executingProject) throws InterruptedException, IOException { JobReference jobRef = new JobReference() - .setProjectId(executingProject.get()) + .setProjectId(executingProject) .setJobId(jobId); String destinationUri = BigQueryIO.getExtractDestinationUri(extractDestinationDir); http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java index 5ec8b57..e754bd2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java @@ -43,25 +43,22 @@ class BigQueryTableSource extends BigQuerySourceBase { private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableSource.class); static BigQueryTableSource create( - ValueProvider<String> jobIdToken, + String stepUuid, ValueProvider<TableReference> table, String extractDestinationDir, - BigQueryServices bqServices, - ValueProvider<String> executingProject) { - return new BigQueryTableSource( - jobIdToken, table, extractDestinationDir, bqServices, executingProject); + BigQueryServices bqServices) { + return new BigQueryTableSource(stepUuid, table, extractDestinationDir, bqServices); } private final ValueProvider<String> jsonTable; private final AtomicReference<Long> tableSizeBytes; private BigQueryTableSource( - ValueProvider<String> jobIdToken, + String stepUuid, ValueProvider<TableReference> table, String extractDestinationDir, - BigQueryServices bqServices, - ValueProvider<String> executingProject) { - super(jobIdToken, extractDestinationDir, bqServices, executingProject); + BigQueryServices bqServices) { + super(stepUuid, extractDestinationDir, bqServices); this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson()); this.tableSizeBytes = new AtomicReference<>(); } http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index baa5621..ef3419e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -20,6 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -1213,11 +1215,9 @@ public class BigQueryIOTest implements Serializable { datasetService.insertAll(table, expected, null); Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceThroughJsonAPI"); - String jobIdToken = "testJobIdToken"; + String stepUuid = "testStepUuid"; BoundedSource<TableRow> bqSource = BigQueryTableSource.create( - StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table), - baseDir.toString(), fakeBqServices, - StaticValueProvider.of("project")); + stepUuid, StaticValueProvider.of(table), baseDir.toString(), fakeBqServices); PipelineOptions options = PipelineOptionsFactory.create(); Assert.assertThat( @@ -1255,15 +1255,15 @@ public class BigQueryIOTest implements Serializable { Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceInitSplit"); - String jobIdToken = "testJobIdToken"; + String stepUuid = "testStepUuid"; String extractDestinationDir = baseDir.toString(); BoundedSource<TableRow> bqSource = BigQueryTableSource.create( - StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table), - extractDestinationDir, fakeBqServices, StaticValueProvider.of("project")); - + stepUuid, StaticValueProvider.of(table), extractDestinationDir, fakeBqServices); PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation(baseDir.toString()); + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + bqOptions.setProject("project"); List<TableRow> read = SourceTestUtils.readFromSource(bqSource, options); assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class))); @@ -1316,10 +1316,17 @@ public class BigQueryIOTest implements Serializable { new TableRow().set("name", "e").set("number", 5L), new TableRow().set("name", "f").set("number", 6L)); - TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name"); - fakeDatasetService.createDataset("project", "data_set", "", ""); + PipelineOptions options = PipelineOptionsFactory.create(); + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + bqOptions.setProject("project"); + String stepUuid = "testStepUuid"; + + TableReference tempTableReference = createTempTableReference( + bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid)); + fakeDatasetService.createDataset( + bqOptions.getProject(), tempTableReference.getDatasetId(), "", ""); fakeDatasetService.createTable(new Table() - .setTableReference(destinationTable) + .setTableReference(tempTableReference) .setSchema(new TableSchema() .setFields( ImmutableList.of( @@ -1327,24 +1334,21 @@ public class BigQueryIOTest implements Serializable { new TableFieldSchema().setName("number").setType("INTEGER"))))); Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryQuerySourceInitSplit"); - String jobIdToken = "testJobIdToken"; + String query = FakeBigQueryServices.encodeQuery(expected); String extractDestinationDir = baseDir.toString(); BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( - StaticValueProvider.of(jobIdToken), StaticValueProvider.of(query), - StaticValueProvider.of(destinationTable), + stepUuid, StaticValueProvider.of(query), true /* flattenResults */, true /* useLegacySql */, extractDestinationDir, fakeBqServices); - - PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation(extractDestinationDir); TableReference queryTable = new TableReference() - .setProjectId("project") - .setDatasetId("data_set") - .setTableId("table_name"); + .setProjectId(bqOptions.getProject()) + .setDatasetId(tempTableReference.getDatasetId()) + .setTableId(tempTableReference.getTableId()); - fakeJobService.expectDryRunQuery("project", query, + fakeJobService.expectDryRunQuery(bqOptions.getProject(), query, new JobStatistics().setQuery( new JobStatistics2() .setTotalBytesProcessed(100L) @@ -1387,7 +1391,13 @@ public class BigQueryIOTest implements Serializable { .withJobService(jobService) .withDatasetService(datasetService); - TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name"); + PipelineOptions options = PipelineOptionsFactory.create(); + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + bqOptions.setProject("project"); + String stepUuid = "testStepUuid"; + + TableReference tempTableReference = createTempTableReference( + bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid)); List<TableRow> expected = ImmutableList.of( new TableRow().set("name", "a").set("number", 1L), new TableRow().set("name", "b").set("number", 2L), @@ -1395,10 +1405,10 @@ public class BigQueryIOTest implements Serializable { new TableRow().set("name", "d").set("number", 4L), new TableRow().set("name", "e").set("number", 5L), new TableRow().set("name", "f").set("number", 6L)); - datasetService.createDataset(destinationTable.getProjectId(), destinationTable.getDatasetId(), - "", ""); + datasetService.createDataset( + tempTableReference.getProjectId(), tempTableReference.getDatasetId(), "", ""); Table table = new Table() - .setTableReference(destinationTable) + .setTableReference(tempTableReference) .setSchema(new TableSchema() .setFields( ImmutableList.of( @@ -1413,18 +1423,15 @@ public class BigQueryIOTest implements Serializable { .setTotalBytesProcessed(100L) .setReferencedTables(ImmutableList.of(table.getTableReference())))); - Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryNoTableQuerySourceInitSplit"); - String jobIdToken = "testJobIdToken"; + Path baseDir = Files.createTempDirectory( + tempFolder, "testBigQueryNoTableQuerySourceInitSplit"); BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( - StaticValueProvider.of(jobIdToken), + stepUuid, StaticValueProvider.of(query), - StaticValueProvider.of(destinationTable), true /* flattenResults */, true /* useLegacySql */, baseDir.toString(), fakeBqServices); - - - PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation(baseDir.toString()); + List<TableRow> read = convertBigDecimaslToLong( SourceTestUtils.readFromSource(bqSource, options)); assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class))); http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index bef9a26..13d345e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -76,7 +76,6 @@ import org.joda.time.Duration; */ class FakeJobService implements JobService, Serializable { static final JsonFactory JSON_FACTORY = Transport.getJsonFactory(); - // Whenever a job is started, the first 5 calls to GetJob will report the job as pending, // the next 5 will return the job as running, and only then will the job report as done. private static final int GET_JOBS_TRANSITION_INTERVAL = 5; @@ -240,7 +239,9 @@ class FakeJobService implements JobService, Serializable { job.job.setStatus(runJob(job.job)); } } catch (Exception e) { - job.job.getStatus().setState("FAILED").setErrorResult(new ErrorProto()); + job.job.getStatus().setState("FAILED").setErrorResult( + new ErrorProto().setMessage( + String.format("Job %s failed: %s", job.job.getConfiguration(), e.toString()))); } return JSON_FACTORY.fromString(JSON_FACTORY.toString(job.job), Job.class); }
