Ensure that each triggered load generates a different job id (for the case of streaming triggered file loads), and add test coverage to catch this.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/518c158f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/518c158f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/518c158f Branch: refs/heads/master Commit: 518c158f82249091a54dca17ae348734f5abe633 Parents: 0aae7aa Author: Reuven Lax <[email protected]> Authored: Tue Aug 15 21:58:14 2017 -0700 Committer: Reuven Lax <[email protected]> Committed: Thu Aug 17 20:25:19 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 12 ++++++++---- .../apache/beam/sdk/io/gcp/bigquery/WriteRename.java | 3 ++- .../apache/beam/sdk/io/gcp/bigquery/WriteTables.java | 4 ++-- .../apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 +- .../apache/beam/sdk/io/gcp/bigquery/FakeJobService.java | 9 +++++++++ 5 files changed, 22 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 09508e0..78dcdde 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -225,15 +225,19 @@ public class BigQueryHelpers { } // Create a unique job id for a table load. - static String createJobId(String prefix, TableDestination tableDestination, int partition) { + static String createJobId(String prefix, TableDestination tableDestination, int partition, + long index) { // Job ID must be different for each partition of each table. String destinationHash = Hashing.murmur3_128().hashUnencodedChars(tableDestination.toString()).toString(); + String jobId = String.format("%s_%s", prefix, destinationHash); if (partition >= 0) { - return String.format("%s_%s_%05d", prefix, destinationHash, partition); - } else { - return String.format("%s_%s", prefix, destinationHash); + jobId += String.format("_%05d", partition); + } + if (index >= 0) { + jobId += String.format("_%05d", index); } + return jobId; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index eb1da5f..ff69476 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -101,7 +101,8 @@ class WriteRename extends DoFn<Iterable<KV<TableDestination, String>>, Void> { // Make sure each destination table gets a unique job id. String jobIdPrefix = - BigQueryHelpers.createJobId(c.sideInput(jobIdToken), finalTableDestination, -1); + BigQueryHelpers.createJobId(c.sideInput(jobIdToken), finalTableDestination, -1, + c.pane().getIndex()); copy( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 24911a7..c8fab75 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -119,8 +119,8 @@ class WriteTables<DestinationT> Integer partition = c.element().getKey().getShardNumber(); List<String> partitionFiles = Lists.newArrayList(c.element().getValue()); - String jobIdPrefix = - BigQueryHelpers.createJobId(c.sideInput(jobIdToken), tableDestination, partition); + String jobIdPrefix = BigQueryHelpers.createJobId( + c.sideInput(jobIdToken), tableDestination, partition, c.pane().getIndex()); if (!singlePartition) { tableReference.setTableId(jobIdPrefix); http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 3d53b7e..43a494e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1955,7 +1955,7 @@ public class BigQueryIOTest implements Serializable { String tableName = String.format("project-id:dataset-id.table%05d", i); TableDestination tableDestination = new TableDestination(tableName, tableName); for (int j = 0; j < numPartitions; ++j) { - String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j); + String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j, -1); List<String> filesPerPartition = Lists.newArrayList(); for (int k = 0; k < numFilesPerPartition; ++k) { String filename = Paths.get(baseDir.toString(), http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index 2045bb7..7d5101d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -109,6 +109,7 @@ class FakeJobService implements JobService, Serializable { public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException { synchronized (allJobs) { + verifyUniqueJobId(jobRef.getJobId()); Job job = new Job(); job.setJobReference(jobRef); job.setConfiguration(new JobConfiguration().setLoad(loadConfig)); @@ -141,6 +142,7 @@ class FakeJobService implements JobService, Serializable { checkArgument(extractConfig.getDestinationFormat().equals("AVRO"), "Only extract to AVRO is supported"); synchronized (allJobs) { + verifyUniqueJobId(jobRef.getJobId()); ++numExtractJobCalls; Job job = new Job(); @@ -175,6 +177,7 @@ class FakeJobService implements JobService, Serializable { public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig) throws IOException, InterruptedException { synchronized (allJobs) { + verifyUniqueJobId(jobRef.getJobId()); Job job = new Job(); job.setJobReference(jobRef); job.setConfiguration(new JobConfiguration().setCopy(copyConfig)); @@ -257,6 +260,12 @@ class FakeJobService implements JobService, Serializable { } } + private void verifyUniqueJobId(String jobId) throws IOException { + if (allJobs.containsColumn(jobId)) { + throw new IOException("Duplicate job id " + jobId); + } + } + private JobStatus runJob(Job job) throws InterruptedException, IOException { if (job.getConfiguration().getLoad() != null) { return runLoadJob(job.getJobReference(), job.getConfiguration().getLoad());
