Forward port Dataflow PR-431 to Beam
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cca861ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cca861ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cca861ba Branch: refs/heads/master Commit: cca861ba82a2e6ba6c6af122be0b8a9932d53cc5 Parents: ecbc641 Author: Pei He <pe...@google.com> Authored: Mon Oct 3 19:37:02 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Thu Oct 6 15:33:03 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/util/PropertyNames.java | 1 + .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 129 +++++++++++++------ .../sdk/io/gcp/bigquery/BigQueryServices.java | 5 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 16 ++- .../gcp/bigquery/BigQueryTableRowIterator.java | 14 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 44 +++++-- .../bigquery/BigQueryTableRowIteratorTest.java | 6 +- 7 files changed, 149 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java index cc9fa5e..b17bcad 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java @@ -30,6 +30,7 @@ public class PropertyNames { public static final String BIGQUERY_TABLE = "table"; public static final String BIGQUERY_QUERY = "bigquery_query"; public static final String BIGQUERY_FLATTEN_RESULTS = "bigquery_flatten_results"; + public static final String BIGQUERY_USE_LEGACY_SQL = "bigquery_use_legacy_sql"; public static final String BIGQUERY_WRITE_DISPOSITION = "write_disposition"; public static final String BIGQUERY_EXPORT_FORMAT = "bigquery_export_format"; public static final String BIGQUERY_EXPORT_SCHEMA = "bigquery_export_schema"; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 91f6073..6d20c3f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -384,6 +384,7 @@ public class BigQueryIO { */ final boolean validate; @Nullable final Boolean flattenResults; + @Nullable final Boolean useLegacySql; @Nullable BigQueryServices bigQueryServices; private static final String QUERY_VALIDATION_FAILURE_ERROR = @@ -397,17 +398,20 @@ public class BigQueryIO { null /* jsonTableRef */, true /* validate */, null /* flattenResults */, + null /* useLegacySql */, null /* bigQueryServices */); } private Bound( String name, @Nullable String query, @Nullable String jsonTableRef, boolean validate, - @Nullable Boolean flattenResults, @Nullable BigQueryServices bigQueryServices) { + @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql, + @Nullable BigQueryServices bigQueryServices) { super(name); this.jsonTableRef = jsonTableRef; this.query = query; this.validate = validate; this.flattenResults = flattenResults; + this.useLegacySql = useLegacySql; this.bigQueryServices = bigQueryServices; } @@ -428,7 +432,8 @@ public class BigQueryIO { */ public Bound from(TableReference table) { return new Bound( - name, query, toJsonString(table), validate, flattenResults, bigQueryServices); + name, query, toJsonString(table), validate, flattenResults, useLegacySql, + bigQueryServices); } /** @@ -440,10 +445,15 @@ public class BigQueryIO { * "flattenResults" in the <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs"> * Jobs documentation</a> for more information. To disable flattening, use * {@link BigQueryIO.Read.Bound#withoutResultFlattening}. + * + * <p>By default, the query will use BigQuery's legacy SQL dialect. To use the BigQuery + * Standard SQL dialect, use {@link BigQueryIO.Read.Bound#usingStandardSql}. */ public Bound fromQuery(String query) { return new Bound(name, query, jsonTableRef, validate, - MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), bigQueryServices); + MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), + MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE), + bigQueryServices); } /** @@ -452,7 +462,9 @@ public class BigQueryIO { * occurs. */ public Bound withoutValidation() { - return new Bound(name, query, jsonTableRef, false, flattenResults, bigQueryServices); + return new Bound( + name, query, jsonTableRef, false /* validate */, flattenResults, useLegacySql, + bigQueryServices); } /** @@ -463,12 +475,27 @@ public class BigQueryIO { * from a table will cause an error during validation. */ public Bound withoutResultFlattening() { - return new Bound(name, query, jsonTableRef, validate, false, bigQueryServices); + return new Bound( + name, query, jsonTableRef, validate, false /* flattenResults */, useLegacySql, + bigQueryServices); + } + + /** + * Enables BigQuery's Standard SQL dialect when reading from a query. + * + * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading + * from a table will cause an error during validation. + */ + public Bound usingStandardSql() { + return new Bound( + name, query, jsonTableRef, validate, flattenResults, false /* useLegacySql */, + bigQueryServices); } @VisibleForTesting Bound withTestServices(BigQueryServices testServices) { - return new Bound(name, query, jsonTableRef, validate, flattenResults, testServices); + return new Bound( + name, query, jsonTableRef, validate, flattenResults, useLegacySql, testServices); } @Override @@ -494,39 +521,42 @@ public class BigQueryIO { } TableReference table = getTableWithDefaultProject(bqOptions); - if (table == null && query == null) { - throw new IllegalStateException( - "Invalid BigQuery read operation, either table reference or query has to be set"); - } else if (table != null && query != null) { - throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a" - + " query and a table, only one of these should be provided"); - } else if (table != null && flattenResults != null) { - throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" - + " table with a result flattening preference, which is not configurable"); - } else if (query != null && flattenResults == null) { - throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" - + " query without a result flattening preference"); + + checkState( + table == null || query == null, + "Invalid BigQueryIO.Read: table reference and query may not both be set"); + checkState( + table != null || query != null, + "Invalid BigQueryIO.Read: one of table reference and query must be set"); + if (table != null) { + checkState( + flattenResults == null, + "Invalid BigQueryIO.Read: Specifies a table with a result flattening" + + " preference, which only applies to queries"); + checkState( + useLegacySql == null, + "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect" + + " preference, which only applies to queries"); + } else /* query != null */ { + checkState(flattenResults != null, "flattenResults should not be null if query is set"); + checkState(useLegacySql != null, "useLegacySql should not be null if query is set"); } - if (validate) { - BigQueryServices bqServices = getBigQueryServices(); - // Check for source table/query presence for early failure notification. - // Note that a presence check can fail if the table or dataset are created by earlier - // stages of the pipeline or if a query depends on earlier stages of a pipeline. For these - // cases the withoutValidation method can be used to disable the check. - if (table != null) { - DatasetService datasetService = bqServices.getDatasetService(bqOptions); - verifyDatasetPresence(datasetService, table); - verifyTablePresence(datasetService, table); - } - if (query != null) { - JobService jobService = bqServices.getJobService(bqOptions); - try { - jobService.dryRunQuery(bqOptions.getProject(), query); - } catch (Exception e) { - throw new IllegalArgumentException( - String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e); - } + // Note that a table or query check can fail if the table or dataset are created by + // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline. + // For these cases the withoutValidation method can be used to disable the check. + if (validate && table != null) { + // Check for source table presence for early failure notification. + DatasetService datasetService = bigQueryServices.getDatasetService(bqOptions); + verifyDatasetPresence(datasetService, table); + verifyTablePresence(datasetService, table); + } else if (validate && query != null) { + JobService jobService = bigQueryServices.getJobService(bqOptions); + try { + jobService.dryRunQuery(bqOptions.getProject(), query, useLegacySql); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e); } } } @@ -562,7 +592,7 @@ public class BigQueryIO { .setTableId(queryTempTableId); source = BigQueryQuerySource.create( - jobIdToken, query, queryTempTableRef, flattenResults, + jobIdToken, query, queryTempTableRef, flattenResults, useLegacySql, extractDestinationDir, bqServices); } else { TableReference inputTable = getTableWithDefaultProject(bqOptions); @@ -621,6 +651,8 @@ public class BigQueryIO { .withLabel("Query")) .addIfNotNull(DisplayData.item("flattenResults", flattenResults) .withLabel("Flatten Query Results")) + .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql) + .withLabel("Use Legacy SQL Dialect")) .addIfNotDefault(DisplayData.item("validation", validate) .withLabel("Validation Enabled"), true); @@ -670,6 +702,15 @@ public class BigQueryIO { return flattenResults; } + /** + * Returns true (false) if the query will (will not) use BigQuery's legacy SQL mode, or null + * if not applicable. + */ + @Nullable + public Boolean getUseLegacySql() { + return useLegacySql; + } + private BigQueryServices getBigQueryServices() { if (bigQueryServices == null) { bigQueryServices = new BigQueryServicesImpl(); @@ -811,6 +852,7 @@ public class BigQueryIO { String query, TableReference queryTempTableRef, Boolean flattenResults, + Boolean useLegacySql, String extractDestinationDir, BigQueryServices bqServices) { return new BigQueryQuerySource( @@ -818,6 +860,7 @@ public class BigQueryIO { query, queryTempTableRef, flattenResults, + useLegacySql, extractDestinationDir, bqServices); } @@ -825,6 +868,7 @@ public class BigQueryIO { private final String query; private final String jsonQueryTempTable; private final Boolean flattenResults; + private final Boolean useLegacySql; private transient AtomicReference<JobStatistics> dryRunJobStats; private BigQueryQuerySource( @@ -832,6 +876,7 @@ public class BigQueryIO { String query, TableReference queryTempTableRef, Boolean flattenResults, + Boolean useLegacySql, String extractDestinationDir, BigQueryServices bqServices) { super(jobIdToken, extractDestinationDir, bqServices, @@ -839,6 +884,7 @@ public class BigQueryIO { this.query = checkNotNull(query, "query"); this.jsonQueryTempTable = toJsonString(queryTempTableRef); this.flattenResults = checkNotNull(flattenResults, "flattenResults"); + this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql"); this.dryRunJobStats = new AtomicReference<>(); } @@ -852,7 +898,7 @@ public class BigQueryIO { public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); return new BigQueryReader(this, bqServices.getReaderFromQuery( - bqOptions, query, executingProject, flattenResults)); + bqOptions, query, executingProject, flattenResults, useLegacySql)); } @Override @@ -883,6 +929,7 @@ public class BigQueryIO { query, tableToExtract, flattenResults, + useLegacySql, bqServices.getJobService(bqOptions)); return tableToExtract; } @@ -909,7 +956,7 @@ public class BigQueryIO { throws InterruptedException, IOException { if (dryRunJobStats.get() == null) { JobStatistics jobStats = - bqServices.getJobService(bqOptions).dryRunQuery(executingProject, query); + bqServices.getJobService(bqOptions).dryRunQuery(executingProject, query, useLegacySql); dryRunJobStats.compareAndSet(null, jobStats); } return dryRunJobStats.get(); @@ -921,6 +968,7 @@ public class BigQueryIO { String query, TableReference destinationTable, boolean flattenResults, + boolean useLegacySql, JobService jobService) throws IOException, InterruptedException { JobReference jobRef = new JobReference() .setProjectId(executingProject) @@ -932,6 +980,7 @@ public class BigQueryIO { .setCreateDisposition("CREATE_IF_NEEDED") .setDestinationTable(destinationTable) .setFlattenResults(flattenResults) + .setUseLegacySql(useLegacySql) .setPriority("BATCH") .setWriteDisposition("WRITE_EMPTY"); jobService.startQueryJob(jobRef, queryConfig); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 16b3a39..eb77f12 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -57,7 +57,8 @@ interface BigQueryServices extends Serializable { * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables. */ BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten); + BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, + @Nullable Boolean useLegacySql); /** * An interface for the Cloud BigQuery load service. @@ -97,7 +98,7 @@ interface BigQueryServices extends Serializable { /** * Dry runs the query in the given project. */ - JobStatistics dryRunQuery(String projectId, String query) + JobStatistics dryRunQuery(String projectId, String query, boolean useLegacySql) throws InterruptedException, IOException; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 3862382..ad2d4ed 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -97,8 +97,9 @@ class BigQueryServicesImpl implements BigQueryServices { @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) { - return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten); + BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, + @Nullable Boolean useLegacySql) { + return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten, useLegacySql); } @VisibleForTesting @@ -265,12 +266,13 @@ class BigQueryServicesImpl implements BigQueryServices { } @Override - public JobStatistics dryRunQuery(String projectId, String query) + public JobStatistics dryRunQuery(String projectId, String query, boolean useLegacySql) throws InterruptedException, IOException { Job job = new Job() .setConfiguration(new JobConfiguration() .setQuery(new JobConfigurationQuery() - .setQuery(query)) + .setQuery(query) + .setUseLegacySql(useLegacySql)) .setDryRun(true)); BackOff backoff = FluentBackoff.DEFAULT @@ -697,10 +699,12 @@ class BigQueryServicesImpl implements BigQueryServices { BigQueryOptions bqOptions, String query, String projectId, - @Nullable Boolean flattenResults) { + @Nullable Boolean flattenResults, + @Nullable Boolean useLegacySql) { return new BigQueryJsonReaderImpl( BigQueryTableRowIterator.fromQuery( - query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults)); + query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults, + useLegacySql)); } private static BigQueryJsonReader fromTable( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java index 64b1dc6..608995a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java @@ -91,6 +91,8 @@ class BigQueryTableRowIterator implements AutoCloseable { private final String query; // Whether to flatten query results. private final boolean flattenResults; + // Whether to use the BigQuery legacy SQL dialect.. + private final boolean useLegacySql; // Temporary dataset used to store query results. private String temporaryDatasetId = null; // Temporary table used to store query results. @@ -98,12 +100,13 @@ class BigQueryTableRowIterator implements AutoCloseable { private BigQueryTableRowIterator( @Nullable TableReference ref, @Nullable String query, @Nullable String projectId, - Bigquery client, boolean flattenResults) { + Bigquery client, boolean flattenResults, boolean useLegacySql) { this.ref = ref; this.query = query; this.projectId = projectId; this.client = checkNotNull(client, "client"); this.flattenResults = flattenResults; + this.useLegacySql = useLegacySql; } /** @@ -112,7 +115,7 @@ class BigQueryTableRowIterator implements AutoCloseable { public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) { checkNotNull(ref, "ref"); checkNotNull(client, "client"); - return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true); + return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true, true); } /** @@ -120,12 +123,14 @@ class BigQueryTableRowIterator implements AutoCloseable { * specified query in the specified project. */ public static BigQueryTableRowIterator fromQuery( - String query, String projectId, Bigquery client, @Nullable Boolean flattenResults) { + String query, String projectId, Bigquery client, @Nullable Boolean flattenResults, + @Nullable Boolean useLegacySql) { checkNotNull(query, "query"); checkNotNull(projectId, "projectId"); checkNotNull(client, "client"); return new BigQueryTableRowIterator(null, query, projectId, client, - MoreObjects.firstNonNull(flattenResults, Boolean.TRUE)); + MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), + MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE)); } /** @@ -416,6 +421,7 @@ class BigQueryTableRowIterator implements AutoCloseable { queryConfig.setQuery(query); queryConfig.setAllowLargeResults(true); queryConfig.setFlattenResults(flattenResults); + queryConfig.setFlattenResults(useLegacySql); TableReference destinationTable = new TableReference(); destinationTable.setProjectId(projectId); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 78a4c4f..d2c6715 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; @@ -182,7 +183,8 @@ public class BigQueryIOTest implements Serializable { @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) { + BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, + @Nullable Boolean useLegacySql) { return new FakeBigQueryReader(jsonTableRowReturns); } @@ -360,7 +362,7 @@ public class BigQueryIOTest implements Serializable { } @Override - public JobStatistics dryRunQuery(String projectId, String query) + public JobStatistics dryRunQuery(String projectId, String query, boolean useLegacySql) throws InterruptedException, IOException { throw new UnsupportedOperationException(); } @@ -537,7 +539,7 @@ public class BigQueryIOTest implements Serializable { Pipeline p = TestPipeline.create(bqOptions); thrown.expect(IllegalStateException.class); thrown.expectMessage( - "Invalid BigQuery read operation, either table reference or query has to be set"); + "Invalid BigQueryIO.Read: one of table reference and query must be set"); p.apply(BigQueryIO.Read.withoutValidation()); p.run(); } @@ -552,8 +554,7 @@ public class BigQueryIOTest implements Serializable { Pipeline p = TestPipeline.create(bqOptions); thrown.expect(IllegalStateException.class); thrown.expectMessage( - "Invalid BigQuery read operation. Specifies both a query and a table, only one of these" - + " should be provided"); + "Invalid BigQueryIO.Read: table reference and query may not both be set"); p.apply("ReadMyTable", BigQueryIO.Read .from("foo.com:project:somedataset.sometable") @@ -571,8 +572,8 @@ public class BigQueryIOTest implements Serializable { Pipeline p = TestPipeline.create(bqOptions); thrown.expect(IllegalStateException.class); thrown.expectMessage( - "Invalid BigQuery read operation. Specifies a" - + " table with a result flattening preference, which is not configurable"); + "Invalid BigQueryIO.Read: Specifies a table with a result flattening preference," + + " which only applies to queries"); p.apply("ReadMyTable", BigQueryIO.Read .from("foo.com:project:somedataset.sometable") @@ -590,8 +591,8 @@ public class BigQueryIOTest implements Serializable { Pipeline p = TestPipeline.create(bqOptions); thrown.expect(IllegalStateException.class); thrown.expectMessage( - "Invalid BigQuery read operation. Specifies a" - + " table with a result flattening preference, which is not configurable"); + "Invalid BigQueryIO.Read: Specifies a table with a result flattening preference," + + " which only applies to queries"); p.apply( BigQueryIO.Read .from("foo.com:project:somedataset.sometable") @@ -601,6 +602,25 @@ public class BigQueryIOTest implements Serializable { } @Test + @Category(RunnableOnService.class) + public void testBuildSourceWithTableAndSqlDialect() { + BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation("gs://testbucket/testdir"); + + Pipeline p = TestPipeline.create(bqOptions); + thrown.expect(IllegalStateException.class); + thrown.expectMessage( + "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect preference," + + " which only applies to queries"); + p.apply( + BigQueryIO.Read + .from("foo.com:project:somedataset.sometable") + .usingStandardSql()); + p.run(); + } + + @Test @Category(NeedsRunner.class) public void testReadFromTable() throws IOException { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); @@ -712,6 +732,7 @@ public class BigQueryIOTest implements Serializable { .from(tableSpec) .fromQuery("myQuery") .withoutResultFlattening() + .usingStandardSql() .withoutValidation(); DisplayData displayData = DisplayData.from(read); @@ -719,6 +740,7 @@ public class BigQueryIOTest implements Serializable { assertThat(displayData, hasDisplayItem("table", tableSpec)); assertThat(displayData, hasDisplayItem("query", "myQuery")); assertThat(displayData, hasDisplayItem("flattenResults", false)); + assertThat(displayData, hasDisplayItem("useLegacySql", false)); assertThat(displayData, hasDisplayItem("validation", false)); } @@ -1189,7 +1211,7 @@ public class BigQueryIOTest implements Serializable { String extractDestinationDir = "mock://tempLocation"; TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name"); BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( - jobIdToken, "query", destinationTable, true /* flattenResults */, + jobIdToken, "query", destinationTable, true /* flattenResults */, true /* useLegacySql */, extractDestinationDir, fakeBqServices); List<TableRow> expected = ImmutableList.of( @@ -1204,7 +1226,7 @@ public class BigQueryIOTest implements Serializable { .setProjectId("testProejct") .setDatasetId("testDataset") .setTableId("testTable"); - when(mockJobService.dryRunQuery(anyString(), anyString())) + when(mockJobService.dryRunQuery(anyString(), anyString(), anyBoolean())) .thenReturn(new JobStatistics().setQuery( new JobStatistics2() .setTotalBytesProcessed(100L) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java index 29a1704..a41b455 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java @@ -199,7 +199,7 @@ public class BigQueryTableRowIteratorTest { String query = "SELECT name, count, photo, anniversary_date, " + "anniversary_datetime, anniversary_time from table"; try (BigQueryTableRowIterator iterator = - BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) { + BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) { iterator.open(); assertTrue(iterator.advance()); TableRow row = iterator.getCurrent(); @@ -281,7 +281,7 @@ public class BigQueryTableRowIteratorTest { "SELECT \"Arthur\" as name, 42 as count, \"%s\" as photo", photoBytesEncoded); try (BigQueryTableRowIterator iterator = - BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) { + BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) { iterator.open(); assertTrue(iterator.advance()); TableRow row = iterator.getCurrent(); @@ -334,7 +334,7 @@ public class BigQueryTableRowIteratorTest { String query = "NOT A QUERY"; try (BigQueryTableRowIterator iterator = - BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) { + BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) { try { iterator.open();