Repository: beam Updated Branches: refs/heads/master b494c4404 -> 41239d808
Sets a TTL on BigQueryIO.read().fromQuery() temp dataset Also fixes a bug where we start the query job twice - once to extract the files, once to get schema. Luckily it doesn't actually run twice, because inserting the same job a second time gives an ignorable error, but it was still icky. Also adds some logging. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f54477ce Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f54477ce Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f54477ce Branch: refs/heads/master Commit: f54477cecdc697b0e539f183be671028f738a0da Parents: b494c44 Author: Eugene Kirpichov <[email protected]> Authored: Thu Sep 21 19:01:43 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Sep 26 21:47:27 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 10 ++--- .../io/gcp/bigquery/BigQueryQuerySource.java | 19 ++++++++- .../sdk/io/gcp/bigquery/BigQueryServices.java | 9 ++++- .../io/gcp/bigquery/BigQueryServicesImpl.java | 18 ++++++++- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 34 ++++++++-------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 41 ++++++++++---------- .../sdk/io/gcp/bigquery/FakeDatasetService.java | 6 ++- 7 files changed, 88 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 3a4b699..e0b86b6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -582,14 +582,12 @@ public class BigQueryIO { public void processElement(ProcessContext c) throws Exception { String jobUuid = c.element(); BigQuerySourceBase source = createSource(jobUuid); - String schema = - BigQueryHelpers.toJsonString( - source.getSchema(c.getPipelineOptions())); - c.output(tableSchemaTag, schema); - List<ResourceId> files = source.extractFiles(c.getPipelineOptions()); - for (ResourceId file : files) { + BigQuerySourceBase.ExtractResult res = + source.extractFiles(c.getPipelineOptions()); + for (ResourceId file : res.extractedFiles) { c.output(file.toString()); } + c.output(tableSchemaTag, BigQueryHelpers.toJsonString(res.schema)); } }) .withOutputTags(filesTag, TupleTagList.of(tableSchemaTag))); http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java index aee88e5..2572e19 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java @@ -39,6 +39,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -46,6 +48,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; */ @VisibleForTesting class BigQueryQuerySource extends BigQuerySourceBase { + private static final Logger LOG = LoggerFactory.getLogger(BigQueryQuerySource.class); static BigQueryQuerySource create( String stepUuid, @@ -109,19 +112,31 @@ class BigQueryQuerySource extends BigQuerySourceBase { TableReference tableToExtract = createTempTableReference( bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid)); + LOG.info("Creating temporary dataset {} for query results", tableToExtract.getDatasetId()); tableService.createDataset( tableToExtract.getProjectId(), tableToExtract.getDatasetId(), location, - "Dataset for BigQuery query job temporary table"); + "Temporary tables for query results of job " + bqOptions.getJobName(), + // Set a TTL of 1 day on the temporary tables, which ought to be enough in all cases: + // the temporary tables are used only to immediately extract them into files. + // They are normally cleaned up, but in case of job failure the cleanup step may not run, + // and then they'll get deleted after the TTL. + 24 * 3600 * 1000L /* 1 day */); // 3. Execute the query. String queryJobId = createJobIdToken(bqOptions.getJobName(), stepUuid) + "-query"; + LOG.info( + "Exporting query results into temporary table {} using job {}", + tableToExtract, + queryJobId); executeQuery( bqOptions.getProject(), queryJobId, tableToExtract, bqServices.getJobService(bqOptions)); + LOG.info("Query job {} completed", queryJobId); + return tableToExtract; } @@ -131,7 +146,9 @@ class BigQueryQuerySource extends BigQuerySourceBase { bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid)); DatasetService tableService = bqServices.getDatasetService(bqOptions); + LOG.info("Deleting temporary table with query results {}", tableToRemove); tableService.deleteTable(tableToRemove); + LOG.info("Deleting temporary dataset with query results {}", tableToRemove.getDatasetId()); tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId()); } http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index c067229..740170a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -145,10 +145,15 @@ interface BigQueryServices extends Serializable { throws IOException, InterruptedException; /** - * Create a {@link Dataset} with the given {@code location} and {@code description}. + * Create a {@link Dataset} with the given {@code location}, {@code description} and default + * expiration time for tables in the dataset (if {@code null}, tables don't expire). */ void createDataset( - String projectId, String datasetId, @Nullable String location, @Nullable String description) + String projectId, + String datasetId, + @Nullable String location, + @Nullable String description, + @Nullable Long defaultTableExpirationMs) throws IOException, InterruptedException; /** http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index b14405e..b37e95e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -586,10 +586,20 @@ class BigQueryServicesImpl implements BigQueryServices { */ @Override public void createDataset( - String projectId, String datasetId, @Nullable String location, @Nullable String description) + String projectId, + String datasetId, + @Nullable String location, + @Nullable String description, + @Nullable Long defaultTableExpirationMs) throws IOException, InterruptedException { createDataset( - projectId, datasetId, location, description, Sleeper.DEFAULT, createDefaultBackoff()); + projectId, + datasetId, + location, + description, + defaultTableExpirationMs, + Sleeper.DEFAULT, + createDefaultBackoff()); } private void createDataset( @@ -597,6 +607,7 @@ class BigQueryServicesImpl implements BigQueryServices { String datasetId, @Nullable String location, @Nullable String description, + @Nullable Long defaultTableExpirationMs, Sleeper sleeper, BackOff backoff) throws IOException, InterruptedException { DatasetReference datasetRef = new DatasetReference() @@ -611,6 +622,9 @@ class BigQueryServicesImpl implements BigQueryServices { dataset.setFriendlyName(description); dataset.setDescription(description); } + if (defaultTableExpirationMs != null) { + dataset.setDefaultTableExpirationMs(defaultTableExpirationMs); + } Exception lastException; do { http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index abe559c..08f091f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -80,17 +80,21 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { this.bqServices = checkNotNull(bqServices, "bqServices"); } - protected TableSchema getSchema(PipelineOptions options) throws Exception { - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - TableReference tableToExtract = getTableToExtract(bqOptions); - TableSchema tableSchema = - bqServices.getDatasetService(bqOptions).getTable(tableToExtract).getSchema(); - return tableSchema; + protected static class ExtractResult { + public final TableSchema schema; + public final List<ResourceId> extractedFiles; + + public ExtractResult(TableSchema schema, List<ResourceId> extractedFiles) { + this.schema = schema; + this.extractedFiles = extractedFiles; + } } - protected List<ResourceId> extractFiles(PipelineOptions options) throws Exception { + protected ExtractResult extractFiles(PipelineOptions options) throws Exception { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); TableReference tableToExtract = getTableToExtract(bqOptions); + TableSchema schema = + bqServices.getDatasetService(bqOptions).getTable(tableToExtract).getSchema(); JobService jobService = bqServices.getJobService(bqOptions); String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid)); final String extractDestinationDir = @@ -102,7 +106,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { jobService, bqOptions.getProject(), extractDestinationDir); - return tempFiles; + return new ExtractResult(schema, tempFiles); } @Override @@ -113,12 +117,10 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { // We ignore desiredBundleSizeBytes anyway, however in any case, we should not initiate // another BigQuery extract job for the repeated split() calls. if (cachedSplitResult == null) { - List<ResourceId> tempFiles = extractFiles(options); - TableSchema tableSchema = getSchema(options); - - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - cleanupTempResource(bqOptions); - cachedSplitResult = checkNotNull(createSources(tempFiles, tableSchema)); + ExtractResult res = extractFiles(options); + LOG.info("Extract job produced {} files", res.extractedFiles.size()); + cleanupTempResource(options.as(BigQueryOptions.class)); + cachedSplitResult = checkNotNull(createSources(res.extractedFiles, res.schema)); } return cachedSplitResult; } @@ -167,9 +169,9 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob); } - List<BoundedSource<TableRow>> createSources(List<ResourceId> files, TableSchema tableSchema) + List<BoundedSource<TableRow>> createSources(List<ResourceId> files, TableSchema schema) throws IOException, InterruptedException { - final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(tableSchema); + final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(schema); SerializableFunction<GenericRecord, TableRow> function = new SerializableFunction<GenericRecord, TableRow>() { http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 5500b12..b033aa8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -298,7 +298,7 @@ public class BigQueryIOTest implements Serializable { bqOptions.setTempLocation(baseDir.toString()); FakeDatasetService fakeDatasetService = new FakeDatasetService(); - fakeDatasetService.createDataset(projectId, datasetId, "", ""); + fakeDatasetService.createDataset(projectId, datasetId, "", "", null); TableReference tableReference = new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId); fakeDatasetService.createTable(new Table() @@ -436,7 +436,7 @@ public class BigQueryIOTest implements Serializable { .setTableId("sometable")); sometable.setNumBytes(1024L * 1024L); FakeDatasetService fakeDatasetService = new FakeDatasetService(); - fakeDatasetService.createDataset("non-executing-project", "somedataset", "", ""); + fakeDatasetService.createDataset("non-executing-project", "somedataset", "", "", null); fakeDatasetService.createTable(sometable); List<TableRow> records = Lists.newArrayList( @@ -492,7 +492,7 @@ public class BigQueryIOTest implements Serializable { .withJobService(new FakeJobService()) .withDatasetService(datasetService); - datasetService.createDataset("project-id", "dataset-id", "", ""); + datasetService.createDataset("project-id", "dataset-id", "", "", null); Pipeline p = TestPipeline.create(bqOptions); @@ -535,7 +535,7 @@ public class BigQueryIOTest implements Serializable { .withJobService(new FakeJobService()) .withDatasetService(datasetService); - datasetService.createDataset("project-id", "dataset-id", "", ""); + datasetService.createDataset("project-id", "dataset-id", "", "", null); final Pattern userPattern = Pattern.compile("([a-z]+)([0-9]+)"); Pipeline p = TestPipeline.create(bqOptions); @@ -685,7 +685,7 @@ public class BigQueryIOTest implements Serializable { new FakeBigQueryServices() .withJobService(new FakeJobService()) .withDatasetService(datasetService); - datasetService.createDataset("project-id", "dataset-id", "", ""); + datasetService.createDataset("project-id", "dataset-id", "", "", null); Pipeline p = TestPipeline.create(bqOptions); TableRow row1 = new TableRow().set("name", "a").set("number", "1"); @@ -733,7 +733,7 @@ public class BigQueryIOTest implements Serializable { elements.add(new TableRow().set("number", i)); } - datasetService.createDataset("project-id", "dataset-id", "", ""); + datasetService.createDataset("project-id", "dataset-id", "", "", null); TestStream<TableRow> testStream = TestStream.create(TableRowJsonCoder.of()) .addElements( @@ -780,7 +780,7 @@ public class BigQueryIOTest implements Serializable { .withJobService(new FakeJobService()) .withDatasetService(datasetService); - datasetService.createDataset("project-id", "dataset-id", "", ""); + datasetService.createDataset("project-id", "dataset-id", "", "", null); TableRow row1 = new TableRow().set("name", "a").set("number", "1"); TableRow row2 = new TableRow().set("name", "b").set("number", "2"); @@ -827,7 +827,7 @@ public class BigQueryIOTest implements Serializable { .withJobService(new FakeJobService()) .withDatasetService(datasetService); - datasetService.createDataset("project-id", "dataset-id", "", ""); + datasetService.createDataset("project-id", "dataset-id", "", "", null); TableRow row1 = new TableRow().set("name", "a").set("number", "1"); TableRow row2 = new TableRow().set("name", "b").set("number", "2"); @@ -880,7 +880,7 @@ public class BigQueryIOTest implements Serializable { .withJobService(new FakeJobService()) .withDatasetService(datasetService); - datasetService.createDataset("defaultproject", "dataset-id", "", ""); + datasetService.createDataset("defaultproject", "dataset-id", "", "", null); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -910,7 +910,7 @@ public class BigQueryIOTest implements Serializable { bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); FakeDatasetService datasetService = new FakeDatasetService(); - datasetService.createDataset("project-id", "dataset-id", "", ""); + datasetService.createDataset("project-id", "dataset-id", "", "", null); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withDatasetService(datasetService); @@ -1072,7 +1072,7 @@ public class BigQueryIOTest implements Serializable { bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); FakeDatasetService datasetService = new FakeDatasetService(); - datasetService.createDataset("project-id", "dataset-id", "", ""); + datasetService.createDataset("project-id", "dataset-id", "", "", null); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withDatasetService(datasetService) .withJobService(new FakeJobService()); @@ -1173,7 +1173,7 @@ public class BigQueryIOTest implements Serializable { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) .withDatasetService(datasetService); - datasetService.createDataset("project-id", "dataset-id", "", ""); + datasetService.createDataset("project-id", "dataset-id", "", "", null); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -1639,7 +1639,7 @@ public class BigQueryIOTest implements Serializable { new TableRow().set("name", "f").set("number", "6")); TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name"); - datasetService.createDataset(table.getProjectId(), table.getDatasetId(), "", ""); + datasetService.createDataset(table.getProjectId(), table.getDatasetId(), "", "", null); datasetService.createTable(new Table().setTableReference(table)); datasetService.insertAll(table, expected, null); @@ -1674,7 +1674,7 @@ public class BigQueryIOTest implements Serializable { new TableRow().set("name", "f").set("number", 6L)); TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name"); - fakeDatasetService.createDataset("project", "data_set", "", ""); + fakeDatasetService.createDataset("project", "data_set", "", "", null); fakeDatasetService.createTable(new Table().setTableReference(table) .setSchema(new TableSchema() .setFields( @@ -1751,7 +1751,7 @@ public class BigQueryIOTest implements Serializable { TableReference tempTableReference = createTempTableReference( bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid)); fakeDatasetService.createDataset( - bqOptions.getProject(), tempTableReference.getDatasetId(), "", ""); + bqOptions.getProject(), tempTableReference.getDatasetId(), "", "", null); fakeDatasetService.createTable(new Table() .setTableReference(tempTableReference) .setSchema(new TableSchema() @@ -1829,7 +1829,7 @@ public class BigQueryIOTest implements Serializable { new TableRow().set("name", "e").set("number", 5L), new TableRow().set("name", "f").set("number", 6L)); datasetService.createDataset( - tempTableReference.getProjectId(), tempTableReference.getDatasetId(), "", ""); + tempTableReference.getProjectId(), tempTableReference.getDatasetId(), "", "", null); Table table = new Table() .setTableReference(tempTableReference) .setSchema(new TableSchema() @@ -2072,10 +2072,9 @@ public class BigQueryIOTest implements Serializable { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) .withDatasetService(datasetService); - datasetService.createDataset("project-id", "dataset-id", "", ""); + datasetService.createDataset("project-id", "dataset-id", "", "", null); Pipeline p = TestPipeline.create(bqOptions); - long numTables = 3; long numPartitions = 3; long numFilesPerPartition = 10; @@ -2182,7 +2181,7 @@ public class BigQueryIOTest implements Serializable { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) .withDatasetService(datasetService); - datasetService.createDataset("project-id", "dataset-id", "", ""); + datasetService.createDataset("project-id", "dataset-id", "", "", null); final int numFinalTables = 3; final int numTempTablesPerFinalTable = 3; @@ -2256,7 +2255,7 @@ public class BigQueryIOTest implements Serializable { FakeDatasetService datasetService = new FakeDatasetService(); String projectId = "project"; String datasetId = "dataset"; - datasetService.createDataset(projectId, datasetId, "", ""); + datasetService.createDataset(projectId, datasetId, "", "", null); List<TableReference> tableRefs = Lists.newArrayList( BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table1")), BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table2")), @@ -2391,7 +2390,7 @@ public class BigQueryIOTest implements Serializable { new FakeBigQueryServices() .withJobService(new FakeJobService()) .withDatasetService(datasetService); - datasetService.createDataset("project-id", "dataset-id", "", ""); + datasetService.createDataset("project-id", "dataset-id", "", "", null); Pipeline p = TestPipeline.create(bqOptions); TableRow row1 = new TableRow().set("name", "a").set("number", "1"); http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java index 323f663..4c67a9c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java @@ -164,7 +164,11 @@ class FakeDatasetService implements DatasetService, Serializable { @Override public void createDataset( - String projectId, String datasetId, String location, String description) + String projectId, + String datasetId, + String location, + String description, + Long defaultTableExpirationMs /* ignored */) throws IOException, InterruptedException { synchronized (BigQueryIOTest.tables) { Map<String, TableContainer> dataset = BigQueryIOTest.tables.get(projectId, datasetId);
