Repository: beam Updated Branches: refs/heads/master a91571ef9 -> f9b5d55e5
Combine jobName with stepUUID for BQIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a4d2a5de Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a4d2a5de Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a4d2a5de Branch: refs/heads/master Commit: a4d2a5ded77803fa1243c86d2008b05865c464f2 Parents: a91571e Author: Sam McVeety <[email protected]> Authored: Mon Dec 26 20:59:08 2016 -0800 Committer: Dan Halperin <[email protected]> Committed: Tue Jan 17 10:06:30 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 111 +++++++++++++------ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 58 +++++++++- 2 files changed, 132 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a4d2a5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 4b19973..701374d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -379,6 +379,51 @@ public class BigQueryIO { } } + @VisibleForTesting + static class BeamJobUuidToBigQueryJobUuid + implements SerializableFunction<String, String> { + @Override + public String apply(String from) { + return "beam_job_" + from; + } + } + + @VisibleForTesting + static class CreatePerBeamJobUuid + implements SerializableFunction<String, String> { + private final String stepUuid; + + private CreatePerBeamJobUuid(String stepUuid) { + this.stepUuid = stepUuid; + } + + @Override + public String apply(String jobUuid) { + return stepUuid + "_" + jobUuid.replaceAll("-", ""); + } + } + + @VisibleForTesting + static class CreateJsonTableRefFromUuid + implements SerializableFunction<String, TableReference> { + private final String executingProject; + + private CreateJsonTableRefFromUuid(String executingProject) { + this.executingProject = executingProject; + } + + @Override + public TableReference apply(String jobUuid) { + String queryTempDatasetId = "temp_dataset_" + jobUuid; + String queryTempTableId = "temp_table_" + jobUuid; + TableReference queryTempTableRef = new TableReference() + .setProjectId(executingProject) + .setDatasetId(queryTempDatasetId) + .setTableId(queryTempTableId); + return queryTempTableRef; + } + } + @Nullable private static ValueProvider<String> displayTable( @Nullable ValueProvider<TableReference> table) { @@ -471,6 +516,9 @@ public class BigQueryIO { @Nullable final Boolean useLegacySql; @Nullable BigQueryServices bigQueryServices; + @VisibleForTesting @Nullable String stepUuid; + @VisibleForTesting @Nullable ValueProvider<String> jobUuid; + private static final String QUERY_VALIDATION_FAILURE_ERROR = "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the" + " pipeline, This validation can be disabled using #withoutValidation."; @@ -667,10 +715,12 @@ public class BigQueryIO { @Override public PCollection<TableRow> expand(PBegin input) { - String uuid = randomUUIDString(); - final String jobIdToken = "beam_job_" + uuid; - + stepUuid = randomUUIDString(); BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + jobUuid = NestedValueProvider.of( + StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid)); + final ValueProvider<String> jobIdToken = NestedValueProvider.of( + jobUuid, new BeamJobUuidToBigQueryJobUuid()); BoundedSource<TableRow> source; final BigQueryServices bqServices = getBigQueryServices(); @@ -679,7 +729,7 @@ public class BigQueryIO { String tempLocation = bqOptions.getTempLocation(); try { IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - extractDestinationDir = factory.resolve(tempLocation, uuid); + extractDestinationDir = factory.resolve(tempLocation, stepUuid); } catch (IOException e) { throw new RuntimeException( String.format("Failed to resolve extract destination directory in %s", tempLocation)); @@ -687,18 +737,9 @@ public class BigQueryIO { final String executingProject = bqOptions.getProject(); if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) { - String queryTempDatasetId = "temp_dataset_" + uuid; - String queryTempTableId = "temp_table_" + uuid; - - TableReference queryTempTableRef = new TableReference() - .setProjectId(executingProject) - .setDatasetId(queryTempDatasetId) - .setTableId(queryTempTableId); - String jsonTableRef = toJsonString(queryTempTableRef); - source = BigQueryQuerySource.create( jobIdToken, query, NestedValueProvider.of( - StaticValueProvider.of(jsonTableRef), new JsonTableRefToTableRef()), + jobUuid, new CreateJsonTableRefFromUuid(executingProject)), flattenResults, useLegacySql, extractDestinationDir, bqServices); } else { ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions); @@ -913,7 +954,7 @@ public class BigQueryIO { static class BigQueryTableSource extends BigQuerySourceBase { static BigQueryTableSource create( - String jobIdToken, + ValueProvider<String> jobIdToken, ValueProvider<TableReference> table, String extractDestinationDir, BigQueryServices bqServices, @@ -926,7 +967,7 @@ public class BigQueryIO { private final AtomicReference<Long> tableSizeBytes; private BigQueryTableSource( - String jobIdToken, + ValueProvider<String> jobIdToken, ValueProvider<TableReference> table, String extractDestinationDir, BigQueryServices bqServices, @@ -982,7 +1023,7 @@ public class BigQueryIO { static class BigQueryQuerySource extends BigQuerySourceBase { static BigQueryQuerySource create( - String jobIdToken, + ValueProvider<String> jobIdToken, ValueProvider<String> query, ValueProvider<TableReference> queryTempTableRef, Boolean flattenResults, @@ -1006,7 +1047,7 @@ public class BigQueryIO { private transient AtomicReference<JobStatistics> dryRunJobStats; private BigQueryQuerySource( - String jobIdToken, + ValueProvider<String> jobIdToken, ValueProvider<String> query, ValueProvider<TableReference> queryTempTableRef, Boolean flattenResults, @@ -1063,7 +1104,7 @@ public class BigQueryIO { "Dataset for BigQuery query job temporary table"); // 3. Execute the query. - String queryJobId = jobIdToken + "-query"; + String queryJobId = jobIdToken.get() + "-query"; executeQuery( executingProject.get(), queryJobId, @@ -1161,13 +1202,13 @@ public class BigQueryIO { // The initial backoff for verifying temp files. private static final Duration INITIAL_FILES_VERIFY_BACKOFF = Duration.standardSeconds(1); - protected final String jobIdToken; + protected final ValueProvider<String> jobIdToken; protected final String extractDestinationDir; protected final BigQueryServices bqServices; protected final ValueProvider<String> executingProject; private BigQuerySourceBase( - String jobIdToken, + ValueProvider<String> jobIdToken, String extractDestinationDir, BigQueryServices bqServices, ValueProvider<String> executingProject) { @@ -1396,8 +1437,8 @@ public class BigQueryIO { } } - private static String getExtractJobId(String jobIdToken) { - return jobIdToken + "-extract"; + private static String getExtractJobId(ValueProvider<String> jobIdToken) { + return jobIdToken.get() + "-extract"; } private static String getExtractDestinationUri(String extractDestinationDir) { @@ -1644,6 +1685,9 @@ public class BigQueryIO { @Nullable private BigQueryServices bigQueryServices; + @VisibleForTesting @Nullable String stepUuid; + @VisibleForTesting @Nullable ValueProvider<String> jobUuid; + private static class TranslateTableSpecFunction implements SerializableFunction<BoundedWindow, TableReference> { private SerializableFunction<BoundedWindow, String> tableSpecFunction; @@ -1924,14 +1968,19 @@ public class BigQueryIO { ValueProvider<TableReference> table = getTableWithDefaultProject(options); - String jobIdToken = "beam_job_" + randomUUIDString(); + stepUuid = randomUUIDString(); + jobUuid = NestedValueProvider.of( + StaticValueProvider.of(options.getJobName()), new CreatePerBeamJobUuid(stepUuid)); + ValueProvider<String> jobIdToken = NestedValueProvider.of( + jobUuid, new BeamJobUuidToBigQueryJobUuid()); + String tempLocation = options.getTempLocation(); String tempFilePrefix; try { IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); tempFilePrefix = factory.resolve( factory.resolve(tempLocation, "BigQueryWriteTemp"), - jobIdToken); + stepUuid); } catch (IOException e) { throw new RuntimeException( String.format("Failed to resolve BigQuery temp location in %s", tempLocation), @@ -2248,7 +2297,7 @@ public class BigQueryIO { static class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> { private final boolean singlePartition; private final BigQueryServices bqServices; - private final String jobIdToken; + private final ValueProvider<String> jobIdToken; private final String tempFilePrefix; private final ValueProvider<String> jsonTableRef; private final ValueProvider<String> jsonSchema; @@ -2258,7 +2307,7 @@ public class BigQueryIO { public WriteTables( boolean singlePartition, BigQueryServices bqServices, - String jobIdToken, + ValueProvider<String> jobIdToken, String tempFilePrefix, ValueProvider<String> jsonTableRef, ValueProvider<String> jsonSchema, @@ -2277,7 +2326,7 @@ public class BigQueryIO { @ProcessElement public void processElement(ProcessContext c) throws Exception { List<String> partition = Lists.newArrayList(c.element().getValue()).get(0); - String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey()); + String jobIdPrefix = String.format(jobIdToken.get() + "_%05d", c.element().getKey()); TableReference ref = fromJsonString(jsonTableRef.get(), TableReference.class); if (!singlePartition) { ref.setTableId(jobIdPrefix); @@ -2383,7 +2432,7 @@ public class BigQueryIO { */ static class WriteRename extends DoFn<String, Void> { private final BigQueryServices bqServices; - private final String jobIdToken; + private final ValueProvider<String> jobIdToken; private final ValueProvider<String> jsonTableRef; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; @@ -2391,7 +2440,7 @@ public class BigQueryIO { public WriteRename( BigQueryServices bqServices, - String jobIdToken, + ValueProvider<String> jobIdToken, ValueProvider<String> jsonTableRef, WriteDisposition writeDisposition, CreateDisposition createDisposition, @@ -2419,7 +2468,7 @@ public class BigQueryIO { } copy( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - jobIdToken, + jobIdToken.get(), fromJsonString(jsonTableRef.get(), TableReference.class), tempTables, writeDisposition, http://git-wip-us.apache.org/repos/asf/beam/blob/a4d2a5de/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 471b5e4..3e8c2c9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -26,6 +26,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -1646,7 +1647,8 @@ public class BigQueryIOTest implements Serializable { TableReference table = BigQueryIO.parseTableSpec("project.data_set.table_name"); String extractDestinationDir = "mock://tempLocation"; BoundedSource<TableRow> bqSource = BigQueryTableSource.create( - jobIdToken, StaticValueProvider.of(table), extractDestinationDir, fakeBqServices, + StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table), + extractDestinationDir, fakeBqServices, StaticValueProvider.of("project")); List<TableRow> expected = ImmutableList.of( @@ -1684,7 +1686,7 @@ public class BigQueryIOTest implements Serializable { TableReference table = BigQueryIO.parseTableSpec("project:data_set.table_name"); String extractDestinationDir = "mock://tempLocation"; BoundedSource<TableRow> bqSource = BigQueryTableSource.create( - jobIdToken, StaticValueProvider.of(table), + StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table), extractDestinationDir, fakeBqServices, StaticValueProvider.of("project")); List<TableRow> expected = ImmutableList.of( @@ -1750,7 +1752,8 @@ public class BigQueryIOTest implements Serializable { String extractDestinationDir = "mock://tempLocation"; TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name"); BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( - jobIdToken, StaticValueProvider.of("query"), StaticValueProvider.of(destinationTable), + StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"), + StaticValueProvider.of(destinationTable), true /* flattenResults */, true /* useLegacySql */, extractDestinationDir, fakeBqServices); @@ -1842,7 +1845,8 @@ public class BigQueryIOTest implements Serializable { String extractDestinationDir = "mock://tempLocation"; TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name"); BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( - jobIdToken, StaticValueProvider.of("query"), StaticValueProvider.of(destinationTable), + StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"), + StaticValueProvider.of(destinationTable), true /* flattenResults */, true /* useLegacySql */, extractDestinationDir, fakeBqServices); @@ -2117,7 +2121,7 @@ public class BigQueryIOTest implements Serializable { WriteTables writeTables = new WriteTables( false, fakeBqServices, - jobIdToken, + StaticValueProvider.of(jobIdToken), tempFilePrefix, StaticValueProvider.of(jsonTable), StaticValueProvider.of(jsonSchema), @@ -2195,7 +2199,7 @@ public class BigQueryIOTest implements Serializable { WriteRename writeRename = new WriteRename( fakeBqServices, - jobIdToken, + StaticValueProvider.of(jobIdToken), StaticValueProvider.of(jsonTable), WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, @@ -2358,4 +2362,46 @@ public class BigQueryIOTest implements Serializable { public void testShardedKeyCoderIsSerializableWithWellKnownCoderType() { CoderProperties.coderSerializable(BigQueryIO.ShardedKeyCoder.of(GlobalWindow.Coder.INSTANCE)); } + + @Test + public void testUniqueStepIdRead() { + RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + Pipeline pipeline = TestPipeline.create(options); + bqOptions.setTempLocation("gs://testbucket/testdir"); + BigQueryIO.Read.Bound read1 = BigQueryIO.Read.fromQuery( + options.getInputQuery()).withoutValidation(); + pipeline.apply(read1); + BigQueryIO.Read.Bound read2 = BigQueryIO.Read.fromQuery( + options.getInputQuery()).withoutValidation(); + pipeline.apply(read2); + assertNotEquals(read1.stepUuid, read2.stepUuid); + assertNotEquals(read1.jobUuid.get(), read2.jobUuid.get()); + } + + @Test + public void testUniqueStepIdWrite() { + RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + bqOptions.setTempLocation("gs://testbucket/testdir"); + Pipeline pipeline = TestPipeline.create(options); + BigQueryIO.Write.Bound write1 = BigQueryIO.Write + .to(options.getOutputTable()) + .withSchema(NestedValueProvider.of( + options.getOutputSchema(), new JsonSchemaToTableSchema())) + .withoutValidation(); + BigQueryIO.Write.Bound write2 = BigQueryIO.Write + .to(options.getOutputTable()) + .withSchema(NestedValueProvider.of( + options.getOutputSchema(), new JsonSchemaToTableSchema())) + .withoutValidation(); + pipeline + .apply(Create.<TableRow>of()) + .apply(write1); + pipeline + .apply(Create.<TableRow>of()) + .apply(write2); + assertNotEquals(write1.stepUuid, write2.stepUuid); + assertNotEquals(write1.jobUuid.get(), write2.jobUuid.get()); + } }
