Repository: beam Updated Branches: refs/heads/master c1b7f8695 -> 68b4c34a4
[BEAM-1258] BigQueryServicesImpl.getTable() returns null when tables not found. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e670e7e0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e670e7e0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e670e7e0 Branch: refs/heads/master Commit: e670e7e0aa19845a162f7da423b663cbd4199f4d Parents: c1b7f86 Author: Pei He <[email protected]> Authored: Tue Jan 10 11:49:37 2017 -0800 Committer: Dan Halperin <[email protected]> Committed: Tue Jan 10 13:51:07 2017 -0800 ---------------------------------------------------------------------- .../sdk/io/gcp/bigquery/BigQueryServices.java | 5 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 32 +++++++--- .../gcp/bigquery/BigQueryServicesImplTest.java | 65 ++++++++++++++++++++ 3 files changed, 94 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e670e7e0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 8ca473d..7173996 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -114,8 +114,11 @@ interface BigQueryServices extends Serializable { */ interface DatasetService { /** - * Gets the specified {@link Table} resource by table ID or {@code null} if no table exists. + * Gets the specified {@link Table} resource by table ID. + * + * <p>Returns null if the table is not found. */ + @Nullable Table getTable(String projectId, String datasetId, String tableId) throws InterruptedException, IOException; http://git-wip-us.apache.org/repos/asf/beam/blob/e670e7e0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 4eb8e7b..c4c7344 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -393,19 +393,37 @@ class BigQueryServicesImpl implements BigQueryServices { * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. */ @Override + @Nullable public Table getTable(String projectId, String datasetId, String tableId) throws IOException, InterruptedException { BackOff backoff = FluentBackoff.DEFAULT .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); - return executeWithRetries( - client.tables().get(projectId, datasetId, tableId), - String.format( - "Unable to get table: %s, aborting after %d retries.", - tableId, MAX_RPC_RETRIES), - Sleeper.DEFAULT, + return getTable( + new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId), backoff, - DONT_RETRY_NOT_FOUND); + Sleeper.DEFAULT); + } + + @VisibleForTesting + @Nullable + Table getTable(TableReference ref, BackOff backoff, Sleeper sleeper) + throws IOException, InterruptedException { + try { + return executeWithRetries( + client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()), + String.format( + "Unable to get table: %s, aborting after %d retries.", + ref.getTableId(), MAX_RPC_RETRIES), + sleeper, + backoff, + DONT_RETRY_NOT_FOUND); + } catch (IOException e) { + if (errorExtractor.itemNotFound(e)) { + return null; + } + throw e; + } } /** http://git-wip-us.apache.org/repos/asf/beam/blob/e670e7e0/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 10ed8bd..bfd1319 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -320,6 +320,71 @@ public class BigQueryServicesImplTest { } @Test + public void testGetTableSucceeds() throws Exception { + TableReference tableRef = new TableReference() + .setProjectId("projectId") + .setDatasetId("datasetId") + .setTableId("tableId"); + + Table testTable = new Table(); + testTable.setTableReference(tableRef); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(403).thenReturn(200); + when(response.getContent()) + .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403))) + .thenReturn(toStream(testTable)); + + BigQueryServicesImpl.DatasetServiceImpl datasetService = + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); + + Table table = datasetService.getTable(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT); + + assertEquals(testTable, table); + verify(response, times(2)).getStatusCode(); + verify(response, times(2)).getContent(); + verify(response, times(2)).getContentType(); + } + + @Test + public void testGetTableNotFound() throws IOException, InterruptedException { + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(404); + + BigQueryServicesImpl.DatasetServiceImpl datasetService = + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); + + TableReference tableRef = new TableReference() + .setProjectId("projectId") + .setDatasetId("datasetId") + .setTableId("tableId"); + Table table = datasetService.getTable(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT); + + assertNull(table); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + @Test + public void testGetTableThrows() throws Exception { + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(401); + + TableReference tableRef = new TableReference() + .setProjectId("projectId") + .setDatasetId("datasetId") + .setTableId("tableId"); + + thrown.expect(IOException.class); + thrown.expectMessage(String.format("Unable to get table: %s", tableRef.getTableId())); + + BigQueryServicesImpl.DatasetServiceImpl datasetService = + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); + datasetService.getTable(tableRef, BackOff.STOP_BACKOFF, Sleeper.DEFAULT); + } + + @Test public void testExecuteWithRetries() throws IOException, InterruptedException { Table testTable = new Table();
