Repository: beam
Updated Branches:
  refs/heads/master b494c4404 -> 41239d808


Sets a TTL on BigQueryIO.read().fromQuery() temp dataset

Also fixes a bug where we start the query job twice -
once to extract the files, once to get schema. Luckily it doesn't
actually run twice, because inserting the same job a second time gives
an ignorable error, but it was still icky.

Also adds some logging.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f54477ce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f54477ce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f54477ce

Branch: refs/heads/master
Commit: f54477cecdc697b0e539f183be671028f738a0da
Parents: b494c44
Author: Eugene Kirpichov <[email protected]>
Authored: Thu Sep 21 19:01:43 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Tue Sep 26 21:47:27 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 10 ++---
 .../io/gcp/bigquery/BigQueryQuerySource.java    | 19 ++++++++-
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  9 ++++-
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 18 ++++++++-
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 34 ++++++++--------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 41 ++++++++++----------
 .../sdk/io/gcp/bigquery/FakeDatasetService.java |  6 ++-
 7 files changed, 88 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 3a4b699..e0b86b6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -582,14 +582,12 @@ public class BigQueryIO {
                           public void processElement(ProcessContext c) throws 
Exception {
                             String jobUuid = c.element();
                             BigQuerySourceBase source = createSource(jobUuid);
-                            String schema =
-                                BigQueryHelpers.toJsonString(
-                                    source.getSchema(c.getPipelineOptions()));
-                            c.output(tableSchemaTag, schema);
-                            List<ResourceId> files = 
source.extractFiles(c.getPipelineOptions());
-                            for (ResourceId file : files) {
+                            BigQuerySourceBase.ExtractResult res =
+                                source.extractFiles(c.getPipelineOptions());
+                            for (ResourceId file : res.extractedFiles) {
                               c.output(file.toString());
                             }
+                            c.output(tableSchemaTag, 
BigQueryHelpers.toJsonString(res.schema));
                           }
                         })
                     .withOutputTags(filesTag, 
TupleTagList.of(tableSchemaTag)));

http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index aee88e5..2572e19 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -39,6 +39,8 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -46,6 +48,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
  */
 @VisibleForTesting
 class BigQueryQuerySource extends BigQuerySourceBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryQuerySource.class);
 
   static BigQueryQuerySource create(
       String stepUuid,
@@ -109,19 +112,31 @@ class BigQueryQuerySource extends BigQuerySourceBase {
     TableReference tableToExtract = createTempTableReference(
         bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), 
stepUuid));
 
+    LOG.info("Creating temporary dataset {} for query results", 
tableToExtract.getDatasetId());
     tableService.createDataset(
         tableToExtract.getProjectId(),
         tableToExtract.getDatasetId(),
         location,
-        "Dataset for BigQuery query job temporary table");
+        "Temporary tables for query results of job " + bqOptions.getJobName(),
+        // Set a TTL of 1 day on the temporary tables, which ought to be 
enough in all cases:
+        // the temporary tables are used only to immediately extract them into 
files.
+        // They are normally cleaned up, but in case of job failure the 
cleanup step may not run,
+        // and then they'll get deleted after the TTL.
+        24 * 3600 * 1000L /* 1 day */);
 
     // 3. Execute the query.
     String queryJobId = createJobIdToken(bqOptions.getJobName(), stepUuid) + 
"-query";
+    LOG.info(
+        "Exporting query results into temporary table {} using job {}",
+        tableToExtract,
+        queryJobId);
     executeQuery(
         bqOptions.getProject(),
         queryJobId,
         tableToExtract,
         bqServices.getJobService(bqOptions));
+    LOG.info("Query job {} completed", queryJobId);
+
     return tableToExtract;
   }
 
@@ -131,7 +146,9 @@ class BigQueryQuerySource extends BigQuerySourceBase {
         bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), 
stepUuid));
 
     DatasetService tableService = bqServices.getDatasetService(bqOptions);
+    LOG.info("Deleting temporary table with query results {}", tableToRemove);
     tableService.deleteTable(tableToRemove);
+    LOG.info("Deleting temporary dataset with query results {}", 
tableToRemove.getDatasetId());
     tableService.deleteDataset(tableToRemove.getProjectId(), 
tableToRemove.getDatasetId());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index c067229..740170a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -145,10 +145,15 @@ interface BigQueryServices extends Serializable {
         throws IOException, InterruptedException;
 
     /**
-     * Create a {@link Dataset} with the given {@code location} and {@code 
description}.
+     * Create a {@link Dataset} with the given {@code location}, {@code 
description} and default
+     * expiration time for tables in the dataset (if {@code null}, tables 
don't expire).
      */
     void createDataset(
-        String projectId, String datasetId, @Nullable String location, 
@Nullable String description)
+        String projectId,
+        String datasetId,
+        @Nullable String location,
+        @Nullable String description,
+        @Nullable Long defaultTableExpirationMs)
         throws IOException, InterruptedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index b14405e..b37e95e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -586,10 +586,20 @@ class BigQueryServicesImpl implements BigQueryServices {
      */
     @Override
     public void createDataset(
-        String projectId, String datasetId, @Nullable String location, 
@Nullable String description)
+        String projectId,
+        String datasetId,
+        @Nullable String location,
+        @Nullable String description,
+        @Nullable Long defaultTableExpirationMs)
         throws IOException, InterruptedException {
       createDataset(
-          projectId, datasetId, location, description, Sleeper.DEFAULT, 
createDefaultBackoff());
+          projectId,
+          datasetId,
+          location,
+          description,
+          defaultTableExpirationMs,
+          Sleeper.DEFAULT,
+          createDefaultBackoff());
     }
 
     private void createDataset(
@@ -597,6 +607,7 @@ class BigQueryServicesImpl implements BigQueryServices {
         String datasetId,
         @Nullable String location,
         @Nullable String description,
+        @Nullable Long defaultTableExpirationMs,
         Sleeper sleeper,
         BackOff backoff) throws IOException, InterruptedException {
       DatasetReference datasetRef = new DatasetReference()
@@ -611,6 +622,9 @@ class BigQueryServicesImpl implements BigQueryServices {
         dataset.setFriendlyName(description);
         dataset.setDescription(description);
       }
+      if (defaultTableExpirationMs != null) {
+        dataset.setDefaultTableExpirationMs(defaultTableExpirationMs);
+      }
 
       Exception lastException;
       do {

http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index abe559c..08f091f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -80,17 +80,21 @@ abstract class BigQuerySourceBase extends 
BoundedSource<TableRow> {
     this.bqServices = checkNotNull(bqServices, "bqServices");
   }
 
-  protected TableSchema getSchema(PipelineOptions options) throws Exception {
-    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-    TableReference tableToExtract = getTableToExtract(bqOptions);
-    TableSchema tableSchema =
-        
bqServices.getDatasetService(bqOptions).getTable(tableToExtract).getSchema();
-    return tableSchema;
+  protected static class ExtractResult {
+    public final TableSchema schema;
+    public final List<ResourceId> extractedFiles;
+
+    public ExtractResult(TableSchema schema, List<ResourceId> extractedFiles) {
+      this.schema = schema;
+      this.extractedFiles = extractedFiles;
+    }
   }
 
-  protected List<ResourceId> extractFiles(PipelineOptions options) throws 
Exception {
+  protected ExtractResult extractFiles(PipelineOptions options) throws 
Exception {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     TableReference tableToExtract = getTableToExtract(bqOptions);
+    TableSchema schema =
+        
bqServices.getDatasetService(bqOptions).getTable(tableToExtract).getSchema();
     JobService jobService = bqServices.getJobService(bqOptions);
     String extractJobId = 
getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
     final String extractDestinationDir =
@@ -102,7 +106,7 @@ abstract class BigQuerySourceBase extends 
BoundedSource<TableRow> {
             jobService,
             bqOptions.getProject(),
             extractDestinationDir);
-    return tempFiles;
+    return new ExtractResult(schema, tempFiles);
   }
 
   @Override
@@ -113,12 +117,10 @@ abstract class BigQuerySourceBase extends 
BoundedSource<TableRow> {
     // We ignore desiredBundleSizeBytes anyway, however in any case, we should 
not initiate
     // another BigQuery extract job for the repeated split() calls.
     if (cachedSplitResult == null) {
-      List<ResourceId> tempFiles = extractFiles(options);
-      TableSchema tableSchema = getSchema(options);
-
-      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-      cleanupTempResource(bqOptions);
-      cachedSplitResult = checkNotNull(createSources(tempFiles, tableSchema));
+      ExtractResult res = extractFiles(options);
+      LOG.info("Extract job produced {} files", res.extractedFiles.size());
+      cleanupTempResource(options.as(BigQueryOptions.class));
+      cachedSplitResult = checkNotNull(createSources(res.extractedFiles, 
res.schema));
     }
     return cachedSplitResult;
   }
@@ -167,9 +169,9 @@ abstract class BigQuerySourceBase extends 
BoundedSource<TableRow> {
     return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
   }
 
-  List<BoundedSource<TableRow>> createSources(List<ResourceId> files, 
TableSchema tableSchema)
+  List<BoundedSource<TableRow>> createSources(List<ResourceId> files, 
TableSchema schema)
       throws IOException, InterruptedException {
-    final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(tableSchema);
+    final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(schema);
 
     SerializableFunction<GenericRecord, TableRow> function =
         new SerializableFunction<GenericRecord, TableRow>() {

http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 5500b12..b033aa8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -298,7 +298,7 @@ public class BigQueryIOTest implements Serializable {
     bqOptions.setTempLocation(baseDir.toString());
 
     FakeDatasetService fakeDatasetService = new FakeDatasetService();
-    fakeDatasetService.createDataset(projectId, datasetId, "", "");
+    fakeDatasetService.createDataset(projectId, datasetId, "", "", null);
     TableReference tableReference =
         new 
TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId);
     fakeDatasetService.createTable(new Table()
@@ -436,7 +436,7 @@ public class BigQueryIOTest implements Serializable {
             .setTableId("sometable"));
     sometable.setNumBytes(1024L * 1024L);
     FakeDatasetService fakeDatasetService = new FakeDatasetService();
-    fakeDatasetService.createDataset("non-executing-project", "somedataset", 
"", "");
+    fakeDatasetService.createDataset("non-executing-project", "somedataset", 
"", "", null);
     fakeDatasetService.createTable(sometable);
 
     List<TableRow> records = Lists.newArrayList(
@@ -492,7 +492,7 @@ public class BigQueryIOTest implements Serializable {
         .withJobService(new FakeJobService())
         .withDatasetService(datasetService);
 
-    datasetService.createDataset("project-id", "dataset-id", "", "");
+    datasetService.createDataset("project-id", "dataset-id", "", "", null);
 
     Pipeline p = TestPipeline.create(bqOptions);
 
@@ -535,7 +535,7 @@ public class BigQueryIOTest implements Serializable {
         .withJobService(new FakeJobService())
         .withDatasetService(datasetService);
 
-    datasetService.createDataset("project-id", "dataset-id", "", "");
+    datasetService.createDataset("project-id", "dataset-id", "", "", null);
 
     final Pattern userPattern = Pattern.compile("([a-z]+)([0-9]+)");
     Pipeline p = TestPipeline.create(bqOptions);
@@ -685,7 +685,7 @@ public class BigQueryIOTest implements Serializable {
         new FakeBigQueryServices()
             .withJobService(new FakeJobService())
             .withDatasetService(datasetService);
-    datasetService.createDataset("project-id", "dataset-id", "", "");
+    datasetService.createDataset("project-id", "dataset-id", "", "", null);
 
     Pipeline p = TestPipeline.create(bqOptions);
     TableRow row1 = new TableRow().set("name", "a").set("number", "1");
@@ -733,7 +733,7 @@ public class BigQueryIOTest implements Serializable {
       elements.add(new TableRow().set("number", i));
     }
 
-    datasetService.createDataset("project-id", "dataset-id", "", "");
+    datasetService.createDataset("project-id", "dataset-id", "", "", null);
     TestStream<TableRow> testStream =
         TestStream.create(TableRowJsonCoder.of())
             .addElements(
@@ -780,7 +780,7 @@ public class BigQueryIOTest implements Serializable {
         .withJobService(new FakeJobService())
         .withDatasetService(datasetService);
 
-    datasetService.createDataset("project-id", "dataset-id", "", "");
+    datasetService.createDataset("project-id", "dataset-id", "", "", null);
 
     TableRow row1 = new TableRow().set("name", "a").set("number", "1");
     TableRow row2 = new TableRow().set("name", "b").set("number", "2");
@@ -827,7 +827,7 @@ public class BigQueryIOTest implements Serializable {
         .withJobService(new FakeJobService())
         .withDatasetService(datasetService);
 
-    datasetService.createDataset("project-id", "dataset-id", "", "");
+    datasetService.createDataset("project-id", "dataset-id", "", "", null);
 
     TableRow row1 = new TableRow().set("name", "a").set("number", "1");
     TableRow row2 = new TableRow().set("name", "b").set("number", "2");
@@ -880,7 +880,7 @@ public class BigQueryIOTest implements Serializable {
         .withJobService(new FakeJobService())
         .withDatasetService(datasetService);
 
-    datasetService.createDataset("defaultproject", "dataset-id", "", "");
+    datasetService.createDataset("defaultproject", "dataset-id", "", "", null);
 
     Pipeline p = TestPipeline.create(bqOptions);
     p.apply(Create.of(
@@ -910,7 +910,7 @@ public class BigQueryIOTest implements Serializable {
     
bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
     FakeDatasetService datasetService = new FakeDatasetService();
-    datasetService.createDataset("project-id", "dataset-id", "", "");
+    datasetService.createDataset("project-id", "dataset-id", "", "", null);
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
             .withDatasetService(datasetService);
 
@@ -1072,7 +1072,7 @@ public class BigQueryIOTest implements Serializable {
     
bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
     FakeDatasetService datasetService = new FakeDatasetService();
-    datasetService.createDataset("project-id", "dataset-id", "", "");
+    datasetService.createDataset("project-id", "dataset-id", "", "", null);
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withDatasetService(datasetService)
         .withJobService(new FakeJobService());
@@ -1173,7 +1173,7 @@ public class BigQueryIOTest implements Serializable {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService())
         .withDatasetService(datasetService);
-    datasetService.createDataset("project-id", "dataset-id", "", "");
+    datasetService.createDataset("project-id", "dataset-id", "", "", null);
     Pipeline p = TestPipeline.create(bqOptions);
 
     p.apply(Create.of(
@@ -1639,7 +1639,7 @@ public class BigQueryIOTest implements Serializable {
         new TableRow().set("name", "f").set("number", "6"));
 
     TableReference table = 
BigQueryHelpers.parseTableSpec("project:data_set.table_name");
-    datasetService.createDataset(table.getProjectId(), table.getDatasetId(), 
"", "");
+    datasetService.createDataset(table.getProjectId(), table.getDatasetId(), 
"", "", null);
     datasetService.createTable(new Table().setTableReference(table));
     datasetService.insertAll(table, expected, null);
 
@@ -1674,7 +1674,7 @@ public class BigQueryIOTest implements Serializable {
         new TableRow().set("name", "f").set("number", 6L));
 
     TableReference table = 
BigQueryHelpers.parseTableSpec("project:data_set.table_name");
-    fakeDatasetService.createDataset("project", "data_set", "", "");
+    fakeDatasetService.createDataset("project", "data_set", "", "", null);
     fakeDatasetService.createTable(new Table().setTableReference(table)
         .setSchema(new TableSchema()
             .setFields(
@@ -1751,7 +1751,7 @@ public class BigQueryIOTest implements Serializable {
     TableReference tempTableReference = createTempTableReference(
         bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), 
stepUuid));
     fakeDatasetService.createDataset(
-        bqOptions.getProject(), tempTableReference.getDatasetId(), "", "");
+        bqOptions.getProject(), tempTableReference.getDatasetId(), "", "", 
null);
     fakeDatasetService.createTable(new Table()
         .setTableReference(tempTableReference)
         .setSchema(new TableSchema()
@@ -1829,7 +1829,7 @@ public class BigQueryIOTest implements Serializable {
         new TableRow().set("name", "e").set("number", 5L),
         new TableRow().set("name", "f").set("number", 6L));
     datasetService.createDataset(
-        tempTableReference.getProjectId(), tempTableReference.getDatasetId(), 
"", "");
+        tempTableReference.getProjectId(), tempTableReference.getDatasetId(), 
"", "", null);
     Table table = new Table()
         .setTableReference(tempTableReference)
         .setSchema(new TableSchema()
@@ -2072,10 +2072,9 @@ public class BigQueryIOTest implements Serializable {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService())
         .withDatasetService(datasetService);
-    datasetService.createDataset("project-id", "dataset-id", "", "");
+    datasetService.createDataset("project-id", "dataset-id", "", "", null);
 
     Pipeline p = TestPipeline.create(bqOptions);
-
     long numTables = 3;
     long numPartitions = 3;
     long numFilesPerPartition = 10;
@@ -2182,7 +2181,7 @@ public class BigQueryIOTest implements Serializable {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService())
         .withDatasetService(datasetService);
-    datasetService.createDataset("project-id", "dataset-id", "", "");
+    datasetService.createDataset("project-id", "dataset-id", "", "", null);
 
     final int numFinalTables = 3;
     final int numTempTablesPerFinalTable = 3;
@@ -2256,7 +2255,7 @@ public class BigQueryIOTest implements Serializable {
     FakeDatasetService datasetService = new FakeDatasetService();
     String projectId = "project";
     String datasetId = "dataset";
-    datasetService.createDataset(projectId, datasetId, "", "");
+    datasetService.createDataset(projectId, datasetId, "", "", null);
     List<TableReference> tableRefs = Lists.newArrayList(
         BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, 
datasetId, "table1")),
         BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, 
datasetId, "table2")),
@@ -2391,7 +2390,7 @@ public class BigQueryIOTest implements Serializable {
         new FakeBigQueryServices()
             .withJobService(new FakeJobService())
             .withDatasetService(datasetService);
-    datasetService.createDataset("project-id", "dataset-id", "", "");
+    datasetService.createDataset("project-id", "dataset-id", "", "", null);
 
     Pipeline p = TestPipeline.create(bqOptions);
     TableRow row1 = new TableRow().set("name", "a").set("number", "1");

http://git-wip-us.apache.org/repos/asf/beam/blob/f54477ce/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index 323f663..4c67a9c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -164,7 +164,11 @@ class FakeDatasetService implements DatasetService, 
Serializable {
 
   @Override
   public void createDataset(
-      String projectId, String datasetId, String location, String description)
+      String projectId,
+      String datasetId,
+      String location,
+      String description,
+      Long defaultTableExpirationMs /* ignored */)
       throws IOException, InterruptedException {
     synchronized (BigQueryIOTest.tables) {
       Map<String, TableContainer> dataset = 
BigQueryIOTest.tables.get(projectId, datasetId);

Reply via email to