Repository: beam Updated Branches: refs/heads/master 2cbc08b58 -> 83f8c460c
[BEAM-1235] BigQueryIO.Write: log failed load/copy jobs. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6531545e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6531545e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6531545e Branch: refs/heads/master Commit: 6531545e647f98870a69bd46fabbbadb727969e5 Parents: 2cbc08b Author: Pei He <[email protected]> Authored: Mon Jan 23 16:25:43 2017 -0800 Committer: Dan Halperin <[email protected]> Committed: Thu Jan 26 17:22:52 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 63 ++++++++++++------- .../io/gcp/bigquery/BigQueryServicesImpl.java | 1 + .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 64 +++++++++++++------- .../gcp/bigquery/BigQueryServicesImplTest.java | 2 + 4 files changed, 87 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6531545e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index b6f9fb0..4ace985 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1155,7 +1155,8 @@ public class BigQueryIO { jobService.startQueryJob(jobRef, queryConfig); Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); if (parseStatus(job) != Status.SUCCEEDED) { - throw new IOException("Query job failed: " + jobId); + throw new IOException(String.format( + "Query job %s failed, status: %s.", jobId, statusToPrettyString(job.getStatus()))); } } @@ -1260,8 +1261,8 @@ public class BigQueryIO { jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); if (parseStatus(extractJob) != Status.SUCCEEDED) { throw new IOException(String.format( - "Extract job %s failed, status: %s", - extractJob.getJobReference().getJobId(), extractJob.getStatus())); + "Extract job %s failed, status: %s.", + extractJob.getJobReference().getJobId(), statusToPrettyString(extractJob.getStatus()))); } List<String> tempFiles = getExtractFilePaths(extractDestinationDir, extractJob); @@ -2361,30 +2362,36 @@ public class BigQueryIO { .setSourceFormat("NEWLINE_DELIMITED_JSON"); String projectId = ref.getProjectId(); + Job lastFailedLoadJob = null; for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; - LOG.info("Starting BigQuery load job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS); JobReference jobRef = new JobReference() .setProjectId(projectId) .setJobId(jobId); jobService.startLoadJob(jobRef, loadConfig); - Status jobStatus = - parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES)); + Job loadJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES); + Status jobStatus = parseStatus(loadJob); switch (jobStatus) { case SUCCEEDED: return; case UNKNOWN: - throw new RuntimeException("Failed to poll the load job status of job " + jobId); + throw new RuntimeException(String.format( + "UNKNOWN status of load job [%s]: %s.", jobId, jobToPrettyString(loadJob))); case FAILED: - LOG.info("BigQuery load job failed: {}", jobId); + lastFailedLoadJob = loadJob; continue; default: - throw new IllegalStateException(String.format("Unexpected job status: %s of job %s", - jobStatus, jobId)); + throw new IllegalStateException(String.format( + "Unexpected status [%s] of load job: %s.", + jobStatus, jobToPrettyString(loadJob))); } } - throw new RuntimeException(String.format("Failed to create the load job %s, reached max " - + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS)); + throw new RuntimeException(String.format( + "Failed to create load job with id prefix %s, " + + "reached max retries: %d, last failed load job: %s.", + jobIdPrefix, + Bound.MAX_RETRY_JOBS, + jobToPrettyString(lastFailedLoadJob))); } static void removeTemporaryFiles( @@ -2491,30 +2498,36 @@ public class BigQueryIO { .setCreateDisposition(createDisposition.name()); String projectId = ref.getProjectId(); + Job lastFailedCopyJob = null; for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; - LOG.info("Starting BigQuery copy job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS); JobReference jobRef = new JobReference() .setProjectId(projectId) .setJobId(jobId); jobService.startCopyJob(jobRef, copyConfig); - Status jobStatus = - parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES)); + Job copyJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES); + Status jobStatus = parseStatus(copyJob); switch (jobStatus) { case SUCCEEDED: return; case UNKNOWN: - throw new RuntimeException("Failed to poll the copy job status of job " + jobId); + throw new RuntimeException(String.format( + "UNKNOWN status of copy job [%s]: %s.", jobId, jobToPrettyString(copyJob))); case FAILED: - LOG.info("BigQuery copy job failed: {}", jobId); + lastFailedCopyJob = copyJob; continue; default: - throw new IllegalStateException(String.format("Unexpected job status: %s of job %s", - jobStatus, jobId)); + throw new IllegalStateException(String.format( + "Unexpected status [%s] of load job: %s.", + jobStatus, jobToPrettyString(copyJob))); } } - throw new RuntimeException(String.format("Failed to create the copy job %s, reached max " - + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS)); + throw new RuntimeException(String.format( + "Failed to create copy job with id prefix %s, " + + "reached max retries: %d, last failed copy job: %s.", + jobIdPrefix, + Bound.MAX_RETRY_JOBS, + jobToPrettyString(lastFailedCopyJob))); } static void removeTemporaryTables(DatasetService tableService, @@ -2549,6 +2562,14 @@ public class BigQueryIO { private Write() {} } + private static String jobToPrettyString(@Nullable Job job) throws IOException { + return job == null ? "null" : job.toPrettyString(); + } + + private static String statusToPrettyString(@Nullable JobStatus status) throws IOException { + return status == null ? "Unknown status: null." : status.toPrettyString(); + } + private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) { try { datasetService.getDataset(table.getProjectId(), table.getDatasetId()); http://git-wip-us.apache.org/repos/asf/beam/blob/6531545e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 75796ab..7c3edbe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -214,6 +214,7 @@ class BigQueryServicesImpl implements BigQueryServices { do { try { client.jobs().insert(jobRef.getProjectId(), job).execute(); + LOG.info("Started BigQuery job: {}.", jobRef); return; // SUCCEEDED } catch (GoogleJsonResponseException e) { if (errorExtractor.itemAlreadyExists(e)) { http://git-wip-us.apache.org/repos/asf/beam/blob/6531545e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 0b8d60d..bbfc2ce 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -988,12 +988,6 @@ public class BigQueryIOTest implements Serializable { .withoutValidation()); p.run(); - logged.verifyInfo("Starting BigQuery load job"); - logged.verifyInfo("BigQuery load job failed"); - logged.verifyInfo("try 0/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyInfo("try 1/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyInfo("try 2/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyNotLogged("try 3/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); File tempDir = new File(bqOptions.getTempLocation()); testNumFiles(tempDir, 0); } @@ -1232,11 +1226,49 @@ public class BigQueryIOTest implements Serializable { .withoutValidation()); thrown.expect(RuntimeException.class); - thrown.expectMessage("Failed to poll the load job status"); - p.run(); + thrown.expectMessage("UNKNOWN status of load job"); + try { + p.run(); + } finally { + File tempDir = new File(bqOptions.getTempLocation()); + testNumFiles(tempDir, 0); + } + } - File tempDir = new File(bqOptions.getTempLocation()); - testNumFiles(tempDir, 0); + @Test + @Category(NeedsRunner.class) + public void testWriteFailedJobs() throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService() + .startJobReturns("done", "done", "done") + .pollJobReturns(Status.FAILED, Status.FAILED, Status.FAILED)); + + Pipeline p = TestPipeline.create(bqOptions); + p.apply(Create.of( + new TableRow().set("name", "a").set("number", 1), + new TableRow().set("name", "b").set("number", 2), + new TableRow().set("name", "c").set("number", 3)) + .withCoder(TableRowJsonCoder.of())) + .apply(BigQueryIO.Write.to("dataset-id.table-id") + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withTestServices(fakeBqServices) + .withoutValidation()); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Failed to create load job with id prefix"); + thrown.expectMessage("reached max retries"); + thrown.expectMessage("last failed load job"); + + try { + p.run(); + } finally { + File tempDir = new File(bqOptions.getTempLocation()); + testNumFiles(tempDir, 0); + } } @Test @@ -2164,12 +2196,6 @@ public class BigQueryIOTest implements Serializable { List<String> tempTables = tester.takeOutputElements(); - logged.verifyInfo("Starting BigQuery load job"); - logged.verifyInfo("BigQuery load job failed"); - logged.verifyInfo("try 0/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyInfo("try 1/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyNotLogged("try 2/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - assertEquals(expectedTempTables, tempTables); } @@ -2237,12 +2263,6 @@ public class BigQueryIOTest implements Serializable { DoFnTester<String, Void> tester = DoFnTester.of(writeRename); tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables); tester.processElement(null); - - logged.verifyInfo("Starting BigQuery copy job"); - logged.verifyInfo("BigQuery copy job failed"); - logged.verifyInfo("try 0/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyInfo("try 1/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyNotLogged("try 2/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/6531545e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 1ce10f1..ef51650 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -138,6 +138,7 @@ public class BigQueryServicesImplTest { verify(response, times(1)).getStatusCode(); verify(response, times(1)).getContent(); verify(response, times(1)).getContentType(); + expectedLogs.verifyInfo(String.format("Started BigQuery job: %s", jobRef)); } /** @@ -161,6 +162,7 @@ public class BigQueryServicesImplTest { verify(response, times(1)).getStatusCode(); verify(response, times(1)).getContent(); verify(response, times(1)).getContentType(); + expectedLogs.verifyNotLogged("Started BigQuery job"); } /**
