Repository: beam Updated Branches: refs/heads/master e77de7c61 -> 7402d7600
Refactor BigQueryServices to have TableReference in methods signatures Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f9d1d682 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f9d1d682 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f9d1d682 Branch: refs/heads/master Commit: f9d1d682340fa3083bc18723605bf3d0aa6d76cd Parents: e77de7c Author: Pei He <[email protected]> Authored: Tue Jan 24 16:45:16 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Tue Jan 24 18:00:40 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 40 +++++-------------- .../sdk/io/gcp/bigquery/BigQueryServices.java | 9 ++--- .../io/gcp/bigquery/BigQueryServicesImpl.java | 23 ++++------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 41 ++++++++------------ .../sdk/io/gcp/bigquery/BigQueryUtilTest.java | 3 +- 5 files changed, 40 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index fa49f55..b6f9fb0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -997,8 +997,7 @@ public class BigQueryIO { TableReference table = JSON_FACTORY.fromString(jsonTable.get(), TableReference.class); Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class)) - .getTable(table.getProjectId(), table.getDatasetId(), table.getTableId()) - .getNumBytes(); + .getTable(table).getNumBytes(); tableSizeBytes.compareAndSet(null, numBytes); } return tableSizeBytes.get(); @@ -1088,10 +1087,7 @@ public class BigQueryIO { DatasetService tableService = bqServices.getDatasetService(bqOptions); if (referencedTables != null && !referencedTables.isEmpty()) { TableReference queryTable = referencedTables.get(0); - location = tableService.getTable( - queryTable.getProjectId(), - queryTable.getDatasetId(), - queryTable.getTableId()).getLocation(); + location = tableService.getTable(queryTable).getLocation(); } // 2. Create the temporary dataset in the query location. @@ -1120,10 +1116,7 @@ public class BigQueryIO { JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class); DatasetService tableService = bqServices.getDatasetService(bqOptions); - tableService.deleteTable( - tableToRemove.getProjectId(), - tableToRemove.getDatasetId(), - tableToRemove.getTableId()); + tableService.deleteTable(tableToRemove); tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId()); } @@ -1227,10 +1220,8 @@ public class BigQueryIO { String extractJobId = getExtractJobId(jobIdToken); List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService); - TableSchema tableSchema = bqServices.getDatasetService(bqOptions).getTable( - tableToExtract.getProjectId(), - tableToExtract.getDatasetId(), - tableToExtract.getTableId()).getSchema(); + TableSchema tableSchema = bqServices.getDatasetService(bqOptions) + .getTable(tableToExtract).getSchema(); cleanupTempResource(bqOptions); return createSources(tempFiles, tableSchema); @@ -1867,13 +1858,9 @@ public class BigQueryIO { DatasetService datasetService, TableReference tableRef) { try { - if (datasetService.getTable( - tableRef.getProjectId(), - tableRef.getDatasetId(), - tableRef.getTableId()) != null) { + if (datasetService.getTable(tableRef) != null) { checkState( - datasetService.isTableEmpty( - tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()), + datasetService.isTableEmpty(tableRef), "BigQuery table is not empty: %s.", BigQueryIO.toTableSpec(tableRef)); } @@ -2535,10 +2522,7 @@ public class BigQueryIO { for (TableReference tableRef : tempTables) { try { LOG.debug("Deleting table {}", toJsonString(tableRef)); - tableService.deleteTable( - tableRef.getProjectId(), - tableRef.getDatasetId(), - tableRef.getTableId()); + tableService.deleteTable(tableRef); } catch (Exception e) { LOG.warn("Failed to delete the table {}", toJsonString(tableRef), e); } @@ -2587,7 +2571,7 @@ public class BigQueryIO { private static void verifyTablePresence(DatasetService datasetService, TableReference table) { try { - datasetService.getTable(table.getProjectId(), table.getDatasetId(), table.getTableId()); + datasetService.getTable(table); } catch (Exception e) { ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { @@ -2712,11 +2696,7 @@ public class BigQueryIO { // every thread from attempting a create and overwhelming our BigQuery quota. DatasetService datasetService = bqServices.getDatasetService(options); if (!createdTables.contains(tableSpec)) { - Table table = datasetService.getTable( - tableReference.getProjectId(), - tableReference.getDatasetId(), - tableReference.getTableId()); - if (table == null) { + if (datasetService.getTable(tableReference) == null) { TableSchema tableSchema = JSON_FACTORY.fromString( jsonTableSchema.get(), TableSchema.class); datasetService.createTable( http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 32cf46d..03e4391 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -119,8 +119,7 @@ interface BigQueryServices extends Serializable { * <p>Returns null if the table is not found. */ @Nullable - Table getTable(String projectId, String datasetId, String tableId) - throws InterruptedException, IOException; + Table getTable(TableReference tableRef) throws InterruptedException, IOException; /** * Creates the specified table if it does not exist. @@ -131,16 +130,14 @@ interface BigQueryServices extends Serializable { * Deletes the table specified by tableId from the dataset. * If the table contains data, all the data will be deleted. */ - void deleteTable(String projectId, String datasetId, String tableId) - throws IOException, InterruptedException; + void deleteTable(TableReference tableRef) throws IOException, InterruptedException; /** * Returns true if the table is empty. * * @throws IOException if the table is not found. */ - boolean isTableEmpty(String projectId, String datasetId, String tableId) - throws IOException, InterruptedException; + boolean isTableEmpty(TableReference tableRef) throws IOException, InterruptedException; /** * Gets the specified {@link Dataset} resource by dataset ID. http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index c524ce4..75796ab 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -394,15 +394,12 @@ class BigQueryServicesImpl implements BigQueryServices { */ @Override @Nullable - public Table getTable(String projectId, String datasetId, String tableId) + public Table getTable(TableReference tableRef) throws IOException, InterruptedException { BackOff backoff = FluentBackoff.DEFAULT .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); - return getTable( - new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId), - backoff, - Sleeper.DEFAULT); + return getTable(tableRef, backoff, Sleeper.DEFAULT); } @VisibleForTesting @@ -506,31 +503,27 @@ class BigQueryServicesImpl implements BigQueryServices { * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. */ @Override - public void deleteTable(String projectId, String datasetId, String tableId) - throws IOException, InterruptedException { + public void deleteTable(TableReference tableRef) throws IOException, InterruptedException { BackOff backoff = FluentBackoff.DEFAULT .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); executeWithRetries( - client.tables().delete(projectId, datasetId, tableId), + client.tables().delete( + tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()), String.format( "Unable to delete table: %s, aborting after %d retries.", - tableId, MAX_RPC_RETRIES), + tableRef.getTableId(), MAX_RPC_RETRIES), Sleeper.DEFAULT, backoff, ALWAYS_RETRY); } @Override - public boolean isTableEmpty(String projectId, String datasetId, String tableId) - throws IOException, InterruptedException { + public boolean isTableEmpty(TableReference tableRef) throws IOException, InterruptedException { BackOff backoff = FluentBackoff.DEFAULT .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); - return isTableEmpty( - new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId), - backoff, - Sleeper.DEFAULT); + return isTableEmpty(tableRef, backoff, Sleeper.DEFAULT); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index ba7f44e..0b8d60d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -32,6 +32,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; @@ -526,18 +527,18 @@ public class BigQueryIOTest implements Serializable { private static class FakeDatasetService implements DatasetService, Serializable { @Override - public Table getTable(String projectId, String datasetId, String tableId) + public Table getTable(TableReference tableRef) throws InterruptedException, IOException { synchronized (tables) { Map<String, TableContainer> dataset = checkNotNull( - tables.get(projectId, datasetId), + tables.get(tableRef.getProjectId(), tableRef.getDatasetId()), "Tried to get a dataset %s:%s from %s, but no such dataset was set", - projectId, - datasetId, - tableId, + tableRef.getProjectId(), + tableRef.getDatasetId(), + tableRef.getTableId(), FakeDatasetService.class.getSimpleName()); - TableContainer tableContainer = dataset.get(tableId); + TableContainer tableContainer = dataset.get(tableRef.getTableId()); return tableContainer == null ? null : tableContainer.getTable(); } } @@ -569,8 +570,7 @@ public class BigQueryIOTest implements Serializable { } @Override - public void deleteTable(String projectId, String datasetId, String tableId) - throws IOException, InterruptedException { + public void deleteTable(TableReference tableRef) throws IOException, InterruptedException { throw new UnsupportedOperationException("Unsupported"); } @@ -595,9 +595,9 @@ public class BigQueryIOTest implements Serializable { } @Override - public boolean isTableEmpty(String projectId, String datasetId, String tableId) + public boolean isTableEmpty(TableReference tableRef) throws IOException, InterruptedException { - Long numBytes = getTable(projectId, datasetId, tableId).getNumBytes(); + Long numBytes = getTable(tableRef).getNumBytes(); return numBytes == null || numBytes == 0L; } @@ -1738,7 +1738,7 @@ public class BigQueryIOTest implements Serializable { IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */); when(mockIOChannelFactory.resolve(anyString(), anyString())) .thenReturn("mock://tempLocation/output"); - when(mockDatasetService.getTable(anyString(), anyString(), anyString())) + when(mockDatasetService.getTable(any(TableReference.class))) .thenReturn(new Table().setSchema(new TableSchema())); Assert.assertThat( @@ -1810,13 +1810,9 @@ public class BigQueryIOTest implements Serializable { new JobStatistics2() .setTotalBytesProcessed(100L) .setReferencedTables(ImmutableList.of(queryTable)))); - when(mockDatasetService.getTable( - eq(queryTable.getProjectId()), eq(queryTable.getDatasetId()), eq(queryTable.getTableId()))) + when(mockDatasetService.getTable(eq(queryTable))) .thenReturn(new Table().setSchema(new TableSchema())); - when(mockDatasetService.getTable( - eq(destinationTable.getProjectId()), - eq(destinationTable.getDatasetId()), - eq(destinationTable.getTableId()))) + when(mockDatasetService.getTable(eq(destinationTable))) .thenReturn(new Table().setSchema(new TableSchema())); IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */); when(mockIOChannelFactory.resolve(anyString(), anyString())) @@ -1898,10 +1894,7 @@ public class BigQueryIOTest implements Serializable { .thenReturn(new JobStatistics().setQuery( new JobStatistics2() .setTotalBytesProcessed(100L))); - when(mockDatasetService.getTable( - eq(destinationTable.getProjectId()), - eq(destinationTable.getDatasetId()), - eq(destinationTable.getTableId()))) + when(mockDatasetService.getTable(eq(destinationTable))) .thenReturn(new Table().setSchema(new TableSchema())); IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */); when(mockIOChannelFactory.resolve(anyString(), anyString())) @@ -2263,9 +2256,9 @@ public class BigQueryIOTest implements Serializable { BigQueryIO.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, tables.get(2)))); doThrow(new IOException("Unable to delete table")) - .when(mockDatasetService).deleteTable(projectId, datasetId, tables.get(0)); - doNothing().when(mockDatasetService).deleteTable(projectId, datasetId, tables.get(1)); - doNothing().when(mockDatasetService).deleteTable(projectId, datasetId, tables.get(2)); + .when(mockDatasetService).deleteTable(tableRefs.get(0)); + doNothing().when(mockDatasetService).deleteTable(tableRefs.get(1)); + doNothing().when(mockDatasetService).deleteTable(tableRefs.get(2)); WriteRename.removeTemporaryTables(mockDatasetService, tableRefs); http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java index 8130238..7b5b226 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java @@ -370,7 +370,8 @@ public class BigQueryUtilTest { BigQueryServicesImpl.DatasetServiceImpl services = new BigQueryServicesImpl.DatasetServiceImpl(mockClient, options); - services.getTable("project", "dataset", "table"); + services.getTable( + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table")); verifyTableGet(); }
