Fix tests to properly fake out BigQueryService, and add tests for dynamic-table functionality.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b486137d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b486137d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b486137d Branch: refs/heads/DSL_SQL Commit: b486137d2190db9212a92176f703e6ed7858fe59 Parents: 760a945 Author: Reuven Lax <[email protected]> Authored: Fri Mar 31 14:16:48 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Apr 18 21:12:50 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 7 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 15 +- .../beam/sdk/io/gcp/bigquery/ShardedKey.java | 2 +- .../sdk/io/gcp/bigquery/StreamingInserts.java | 5 +- .../sdk/io/gcp/bigquery/StreamingWriteFn.java | 1 - .../sdk/io/gcp/bigquery/TableDestination.java | 3 +- .../sdk/io/gcp/bigquery/TableRowWriter.java | 3 +- .../sdk/io/gcp/bigquery/TagWithUniqueIds.java | 9 - .../io/gcp/bigquery/WriteBundlesToFiles.java | 12 +- .../sdk/io/gcp/bigquery/WritePartition.java | 13 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 613 ++++++++++--------- .../io/gcp/bigquery/FakeBigQueryServices.java | 114 +++- .../sdk/io/gcp/bigquery/FakeDatasetService.java | 138 +++-- .../sdk/io/gcp/bigquery/FakeJobService.java | 182 +++++- .../sdk/io/gcp/bigquery/TableContainer.java | 33 +- 15 files changed, 703 insertions(+), 447 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 5e80fae..06fdfce 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -58,9 +58,8 @@ import org.apache.beam.sdk.values.TupleTagList; /** * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */ -class BatchLoads<T> extends - PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> { - BigQueryIO.Write<T> write; +class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> { + BigQueryIO.Write<?> write; private static class ConstantSchemaFunction implements SerializableFunction<TableDestination, TableSchema> { @@ -79,7 +78,7 @@ class BatchLoads<T> extends } } - BatchLoads(BigQueryIO.Write<T> write) { + BatchLoads(BigQueryIO.Write<?> write) { this.write = write; } http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index f1baaf7..54a25c7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -64,7 +64,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.Transport; @@ -536,7 +535,7 @@ public class BigQueryIO { } } if (extractFiles != null && !extractFiles.isEmpty()) { - new GcsUtilFactory().create(options).remove(extractFiles); + IOChannelUtils.getFactory(extractFiles.iterator().next()).remove(extractFiles); } } }; @@ -701,8 +700,8 @@ public class BigQueryIO { @AutoValue.Builder abstract static class Builder<T> { abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef); - abstract Builder<T> setTableRefFunction( - SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction); + abstract Builder<T> setTableFunction( + SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction); abstract Builder<T> setFormatFunction( SerializableFunction<T, TableRow> formatFunction); abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema); @@ -823,8 +822,7 @@ public class BigQueryIO { * {@link ValueInSingleWindow}, so can be determined by the value or by the window. */ public Write<T> to( - SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction) { - return toTableReference(new TranslateTableSpecFunction<T>(tableSpecFunction)); + SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) { ensureToNotCalledYet(); return toBuilder().setTableFunction(tableFunction).build(); } @@ -834,7 +832,7 @@ public class BigQueryIO { * {@link TableReference} instead of a string table specification. */ private Write<T> toTableReference( - SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction) { + SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) { ensureToNotCalledYet(); return toBuilder().setTableFunction(tableFunction).build(); } @@ -984,8 +982,7 @@ public class BigQueryIO { if (input.isBounded() == IsBounded.UNBOUNDED) { return rowsWithDestination.apply(new StreamingInserts(this)); } else { - - return rowsWithDestination.apply(new BatchLoads<T>(this)); + return rowsWithDestination.apply(new BatchLoads(this)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java index ab57446..09b4fbf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java @@ -56,7 +56,7 @@ class ShardedKey<K> implements Serializable { return false; } ShardedKey<K> other = (ShardedKey<K>) o; - return (key == other.key) && (shardNumber == other.shardNumber); + return Objects.equals(key, other.key) && Objects.equals(shardNumber, other.shardNumber); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java index 37afbdf..ced1d66 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java @@ -38,9 +38,8 @@ import org.apache.beam.sdk.values.PCollection; * PTransform that performs streaming BigQuery write. To increase consistency, * it leverages BigQuery best effort de-dup mechanism. */ - -class StreamingInserts - extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> { +class StreamingInserts extends PTransform<PCollection<KV<TableDestination, TableRow>>, + WriteResult> { private final Write<?> write; private static class ConstantSchemaFunction implements http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java index 83ed3d2..22b2078 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java @@ -98,7 +98,6 @@ class StreamingWriteFn private void flushRows(TableReference tableReference, List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options) throws InterruptedException { - System.out.println("FlUSHING ROWS " + tableRows.size()); if (!tableRows.isEmpty()) { try { long totalBytes = bqServices.getDatasetService(options).insertAll( http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index e8538e0..36e1401 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -64,7 +64,8 @@ public class TableDestination implements Serializable { return false; } TableDestination other = (TableDestination) o; - return (tableSpec == other.tableSpec) && (tableDescription == other.tableDescription); + return Objects.equals(this.tableSpec, other.tableSpec) + && Objects.equals(this.tableDescription, other.tableDescription); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java index a1f6153..ee8f466 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; -import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +56,7 @@ class TableRowWriter { } } TableRowWriter(String basename) { - this.tempFilePrefix = basename; + this.tempFilePrefix = basename; } public final void open(String uId) throws Exception { http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java index 6f0186e..7379784 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java @@ -18,23 +18,14 @@ package org.apache.beam.sdk.io.gcp.bigquery; -import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; import java.io.IOException; import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; -import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.ValueInSingleWindow; /** * Fn that tags each table row with a unique id and destination table. http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index b8069f6..869e68a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -19,19 +19,16 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableRow; - +import com.google.common.collect.Maps; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.util.Map; import java.util.UUID; - -import com.google.common.collect.Maps; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -50,6 +47,10 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund private transient Map<TableDestination, TableRowWriter> writers; private final String tempFilePrefix; + /** + * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file, + * and encapsulates the table it is destined to as well as the file byte size. + */ public static class Result implements Serializable { public String filename; public Long fileByteSize; @@ -62,6 +63,9 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund } } + /** + * a coder for the {@link Result} class. + */ public static class ResultCoder extends AtomicCoder<Result> { private static final ResultCoder INSTANCE = new ResultCoder(); http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index c48955b..9c48b82 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -89,8 +89,8 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List< partitions.add(Lists.<String>newArrayList()); currResultsMap.put(tableDestination, partitions); } - int currNumFiles = currNumFilesMap.getOrDefault(tableDestination, 0); - long currSizeBytes = currSizeBytesMap.getOrDefault(tableDestination, 0L); + int currNumFiles = getOrDefault(currNumFilesMap, tableDestination, 0); + long currSizeBytes = getOrDefault(currSizeBytesMap, tableDestination, 0L); if (currNumFiles + 1 > Write.MAX_NUM_FILES || currSizeBytes + fileResult.fileByteSize > Write.MAX_SIZE_BYTES) { // Add a new partition for this table. @@ -117,4 +117,13 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List< } } } + + private <T> T getOrDefault(Map<TableDestination, T> map, TableDestination tableDestination, + T defaultValue) { + if (map.containsKey(tableDestination)) { + return map.get(tableDestination); + } else { + return defaultValue; + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index d1ef8e2..f10be13 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -26,17 +28,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.when; - -import com.google.api.client.json.GenericJson; + import com.google.api.client.util.Data; import com.google.api.services.bigquery.model.Job; -import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatistics2; import com.google.api.services.bigquery.model.JobStatistics4; @@ -48,7 +42,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -58,9 +52,12 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.math.BigDecimal; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; @@ -69,14 +66,10 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericRecordBuilder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -88,7 +81,6 @@ import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.options.BigQueryOptions; @@ -122,7 +114,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.PCollectionViews; @@ -140,6 +131,7 @@ import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -147,10 +139,6 @@ import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; /** * Tests for BigQueryIO. @@ -158,6 +146,8 @@ import org.mockito.MockitoAnnotations; @RunWith(JUnit4.class) public class BigQueryIOTest implements Serializable { + private static Path tempFolder; + // Table information must be static, as each ParDo will get a separate instance of // FakeDatasetServices, and they must all modify the same storage. static com.google.common.collect.Table<String, String, Map<String, TableContainer>> @@ -169,8 +159,6 @@ public class BigQueryIOTest implements Serializable { @Rule public transient ExpectedLogs loggedWriteRename = ExpectedLogs.none(WriteRename.class); @Rule public transient ExpectedLogs loggedWriteTables = ExpectedLogs.none(WriteTables.class); @Rule public transient TemporaryFolder testFolder = new TemporaryFolder(); - @Mock private transient IOChannelFactory mockIOChannelFactory; - @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService; private void checkReadTableObject( BigQueryIO.Read read, String project, String dataset, String table) { @@ -227,9 +215,13 @@ public class BigQueryIOTest implements Serializable { assertEquals(validate, write.getValidate()); } + @BeforeClass + public static void setupClass() throws IOException { + tempFolder = Files.createTempDirectory("BigQueryIOTest"); + } + @Before public void setUp() throws IOException { - MockitoAnnotations.initMocks(this); tables = HashBasedTable.create(); BigQueryIO.clearCreatedTables(); } @@ -289,29 +281,53 @@ public class BigQueryIOTest implements Serializable { String tableId = "sometable"; BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject(projectId); - bqOptions.setTempLocation("gs://testbucket/testdir"); + + Path baseDir = Files.createTempDirectory(tempFolder, "testValidateReadSetsDefaultProject"); + bqOptions.setTempLocation(baseDir.toString()); FakeDatasetService fakeDatasetService = new FakeDatasetService(); fakeDatasetService.createDataset(projectId, datasetId, "", ""); TableReference tableReference = new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId); - fakeDatasetService.createTable(new Table().setTableReference(tableReference)); + fakeDatasetService.createTable(new Table() + .setTableReference(tableReference) + .setSchema(new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER"))))); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) .withDatasetService(fakeDatasetService); + List<TableRow> expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", 1L), + new TableRow().set("name", "b").set("number", 2L), + new TableRow().set("name", "c").set("number", 3L), + new TableRow().set("name", "d").set("number", 4L), + new TableRow().set("name", "e").set("number", 5L), + new TableRow().set("name", "f").set("number", 6L)); + fakeDatasetService.insertAll(tableReference, expected, null); + Pipeline p = TestPipeline.create(bqOptions); TableReference tableRef = new TableReference(); tableRef.setDatasetId(datasetId); tableRef.setTableId(tableId); - thrown.expect(RuntimeException.class); - // Message will be one of following depending on the execution environment. - thrown.expectMessage(Matchers.containsString("Unsupported")); - p.apply(BigQueryIO.read().from(tableRef) - .withTestServices(fakeBqServices)); + PCollection<KV<String, Long>> output = + p.apply(BigQueryIO.read().from(tableRef).withTestServices(fakeBqServices)) + .apply(ParDo.of(new DoFn<TableRow, KV<String, Long>>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + c.output(KV.of((String) c.element().get("name"), + Long.valueOf((String) c.element().get("number")))); + } + })); + PAssert.that(output).containsInAnyOrder(ImmutableList.of(KV.of("a", 1L), KV.of("b", 2L), + KV.of("c", 3L), KV.of("d", 4L), KV.of("e", 5L), KV.of("f", 6L))); + p.run(); } @Test @@ -400,54 +416,32 @@ public class BigQueryIOTest implements Serializable { FakeDatasetService fakeDatasetService = new FakeDatasetService(); fakeDatasetService.createDataset("non-executing-project", "somedataset", "", ""); fakeDatasetService.createTable(sometable); - SerializableFunction<Void, Schema> schemaGenerator = - new SerializableFunction<Void, Schema>() { - @Override - public Schema apply(Void input) { - return BigQueryAvroUtils.toGenericAvroSchema( - "sometable", - ImmutableList.of( - new TableFieldSchema().setName("name").setType("STRING"), - new TableFieldSchema().setName("number").setType("INTEGER"))); - } - }; - Collection<Map<String, Object>> records = - ImmutableList.<Map<String, Object>>builder() - .add(ImmutableMap.<String, Object>builder().put("name", "a").put("number", 1L).build()) - .add(ImmutableMap.<String, Object>builder().put("name", "b").put("number", 2L).build()) - .add(ImmutableMap.<String, Object>builder().put("name", "c").put("number", 3L).build()) - .build(); - SerializableFunction<GenericJson, Void> onStartJob = - new WriteExtractFiles(schemaGenerator, records); + List<TableRow> records = Lists.newArrayList( + new TableRow().set("name", "a").set("number", 1L), + new TableRow().set("name", "b").set("number", 2L), + new TableRow().set("name", "c").set("number", 3L)); + fakeDatasetService.insertAll(sometable.getTableReference(), records, null); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) - // .startJobReturns(onStartJob, "done") - // .pollJobReturns(job) - // .getJobReturns((Job) null) - // .verifyExecutingProject(bqOptions.getProject())) - .withDatasetService(fakeDatasetService) - .readerReturns( - toJsonString(new TableRow().set("name", "a").set("number", 1)), - toJsonString(new TableRow().set("name", "b").set("number", 2)), - toJsonString(new TableRow().set("name", "c").set("number", 3))); + .withDatasetService(fakeDatasetService); Pipeline p = TestPipeline.create(bqOptions); - PCollection<String> output = p + PCollection<KV<String, Long>> output = p .apply(BigQueryIO.read().from("non-executing-project:somedataset.sometable") .withTestServices(fakeBqServices) .withoutValidation()) - .apply(ParDo.of(new DoFn<TableRow, String>() { + .apply(ParDo.of(new DoFn<TableRow, KV<String, Long>>() { @ProcessElement public void processElement(ProcessContext c) throws Exception { - c.output((String) c.element().get("name")); + c.output(KV.of((String) c.element().get("name"), + Long.valueOf((String) c.element().get("number")))); } })); PAssert.that(output) - .containsInAnyOrder(ImmutableList.of("a", "b", "c")); - + .containsInAnyOrder(ImmutableList.of(KV.of("a", 1L), KV.of("b", 2L), KV.of("c", 3L))); p.run(); } @@ -457,13 +451,12 @@ public class BigQueryIOTest implements Serializable { bqOptions.setProject("defaultproject"); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + FakeDatasetService datasetService = new FakeDatasetService(); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) - // .startJobReturns("done", "done", "done") - // .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED)) - .withDatasetService(mockDatasetService); + .withDatasetService(datasetService); - mockDatasetService.createDataset("defaultproject", "dataset-id", "", ""); + datasetService.createDataset("defaultproject", "dataset-id", "", ""); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -715,11 +708,11 @@ public class BigQueryIOTest implements Serializable { bqOptions.setProject("defaultproject"); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + FakeDatasetService datasetService = new FakeDatasetService(); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(new FakeJobService()); - // .startJobReturns("done", "done") - // .pollJobReturns(Status.FAILED, Status.UNKNOWN)); - + .withJobService(new FakeJobService()) + .withDatasetService(datasetService); + datasetService.createDataset("project-id", "dataset-id", "", ""); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( new TableRow().set("name", "a").set("number", 1), @@ -732,7 +725,7 @@ public class BigQueryIOTest implements Serializable { .withoutValidation()); thrown.expect(RuntimeException.class); - thrown.expectMessage("UNKNOWN status of load job"); + thrown.expectMessage("Failed to create load job"); try { p.run(); } finally { @@ -747,10 +740,10 @@ public class BigQueryIOTest implements Serializable { bqOptions.setProject("defaultproject"); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + FakeDatasetService datasetService = new FakeDatasetService(); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(new FakeJobService()); - // .startJobReturns("done", "done", "done") - // .pollJobReturns(Status.FAILED, Status.FAILED, Status.FAILED)); + .withJobService(new FakeJobService()) + .withDatasetService(datasetService); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -817,7 +810,7 @@ public class BigQueryIOTest implements Serializable { BigQueryIO.Read read = BigQueryIO.read() .from("project:dataset.tableId") .withTestServices(new FakeBigQueryServices() - .withDatasetService(mockDatasetService) + .withDatasetService(new FakeDatasetService()) .withJobService(new FakeJobService())) .withoutValidation(); @@ -833,7 +826,7 @@ public class BigQueryIOTest implements Serializable { BigQueryIO.Read read = BigQueryIO.read() .fromQuery("foobar") .withTestServices(new FakeBigQueryServices() - .withDatasetService(mockDatasetService) + .withDatasetService(new FakeDatasetService()) .withJobService(new FakeJobService())) .withoutValidation(); @@ -874,7 +867,7 @@ public class BigQueryIOTest implements Serializable { .to("project:dataset.table") .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2")) .withTestServices(new FakeBigQueryServices() - .withDatasetService(mockDatasetService) + .withDatasetService(new FakeDatasetService()) .withJobService(new FakeJobService())) .withoutValidation(); @@ -1040,9 +1033,7 @@ public class BigQueryIOTest implements Serializable { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) - .withDatasetService(mockDatasetService); - when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow( - new RuntimeException("Unable to confirm BigQuery dataset presence")); + .withDatasetService(new FakeDatasetService()); Pipeline p = TestPipeline.create(options); @@ -1206,26 +1197,31 @@ public class BigQueryIOTest implements Serializable { @Test public void testBigQueryTableSourceThroughJsonAPI() throws Exception { + FakeDatasetService datasetService = new FakeDatasetService(); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) - .readerReturns( - toJsonString(new TableRow().set("name", "a").set("number", "1")), - toJsonString(new TableRow().set("name", "b").set("number", "2")), - toJsonString(new TableRow().set("name", "c").set("number", "3"))); + .withDatasetService(datasetService); + List<TableRow> expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", "1"), + new TableRow().set("name", "b").set("number", "2"), + new TableRow().set("name", "c").set("number", "3"), + new TableRow().set("name", "d").set("number", "4"), + new TableRow().set("name", "e").set("number", "5"), + new TableRow().set("name", "f").set("number", "6")); + + TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name"); + datasetService.createDataset(table.getProjectId(), table.getDatasetId(), "", ""); + datasetService.createTable(new Table().setTableReference(table)); + datasetService.insertAll(table, expected, null); + + Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceThroughJsonAPI"); String jobIdToken = "testJobIdToken"; - TableReference table = BigQueryHelpers.parseTableSpec("project.data_set.table_name"); - String extractDestinationDir = "mock://tempLocation"; BoundedSource<TableRow> bqSource = BigQueryTableSource.create( StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table), - extractDestinationDir, fakeBqServices, + baseDir.toString(), fakeBqServices, StaticValueProvider.of("project")); - List<TableRow> expected = ImmutableList.of( - new TableRow().set("name", "a").set("number", "1"), - new TableRow().set("name", "b").set("number", "2"), - new TableRow().set("name", "c").set("number", "3")); - PipelineOptions options = PipelineOptionsFactory.create(); Assert.assertThat( SourceTestUtils.readFromSource(bqSource, options), @@ -1244,43 +1240,48 @@ public class BigQueryIOTest implements Serializable { extractJob.setStatus(new JobStatus()) .setStatistics(jobStats); + FakeDatasetService fakeDatasetService = new FakeDatasetService(); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) - .withDatasetService(mockDatasetService) - .readerReturns( - toJsonString(new TableRow().set("name", "a").set("number", "1")), - toJsonString(new TableRow().set("name", "b").set("number", "2")), - toJsonString(new TableRow().set("name", "c").set("number", "3"))); + .withDatasetService(fakeDatasetService); + + List<TableRow> expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", 1L), + new TableRow().set("name", "b").set("number", 2L), + new TableRow().set("name", "c").set("number", 3L), + new TableRow().set("name", "d").set("number", 4L), + new TableRow().set("name", "e").set("number", 5L), + new TableRow().set("name", "f").set("number", 6L)); - String jobIdToken = "testJobIdToken"; TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name"); - String extractDestinationDir = "mock://tempLocation"; + fakeDatasetService.createDataset("project", "data_set", "", ""); + fakeDatasetService.createTable(new Table().setTableReference(table) + .setSchema(new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER"))))); + fakeDatasetService.insertAll(table, expected, null); + + Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceInitSplit"); + + String jobIdToken = "testJobIdToken"; + String extractDestinationDir = baseDir.toString(); BoundedSource<TableRow> bqSource = BigQueryTableSource.create( StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table), extractDestinationDir, fakeBqServices, StaticValueProvider.of("project")); - List<TableRow> expected = ImmutableList.of( - new TableRow().set("name", "a").set("number", "1"), - new TableRow().set("name", "b").set("number", "2"), - new TableRow().set("name", "c").set("number", "3")); PipelineOptions options = PipelineOptionsFactory.create(); - options.setTempLocation("mock://tempLocation"); - - IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */); - when(mockIOChannelFactory.resolve(anyString(), anyString())) - .thenReturn("mock://tempLocation/output"); - when(mockDatasetService.getTable(any(TableReference.class))) - .thenReturn(new Table().setSchema(new TableSchema())); + options.setTempLocation(baseDir.toString()); - Assert.assertThat( - SourceTestUtils.readFromSource(bqSource, options), - CoreMatchers.is(expected)); + List<TableRow> read = SourceTestUtils.readFromSource(bqSource, options); + assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class))); SourceTestUtils.assertSplitAtFractionBehavior( bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options); - assertEquals(1, sources.size()); + assertEquals(2, sources.size()); BoundedSource<TableRow> actual = sources.get(0); assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); } @@ -1306,80 +1307,63 @@ public class BigQueryIOTest implements Serializable { .setStatistics(extractJobStats); FakeJobService fakeJobService = new FakeJobService(); + FakeDatasetService fakeDatasetService = new FakeDatasetService(); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(fakeJobService) - .withDatasetService(mockDatasetService) - .readerReturns( - toJsonString(new TableRow().set("name", "a").set("number", "1")), - toJsonString(new TableRow().set("name", "b").set("number", "2")), - toJsonString(new TableRow().set("name", "c").set("number", "3"))); + .withDatasetService(fakeDatasetService); + + List<TableRow> expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", 1L), + new TableRow().set("name", "b").set("number", 2L), + new TableRow().set("name", "c").set("number", 3L), + new TableRow().set("name", "d").set("number", 4L), + new TableRow().set("name", "e").set("number", 5L), + new TableRow().set("name", "f").set("number", 6L)); - String jobIdToken = "testJobIdToken"; - String extractDestinationDir = "mock://tempLocation"; TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name"); + fakeDatasetService.createDataset("project", "data_set", "", ""); + fakeDatasetService.createTable(new Table() + .setTableReference(destinationTable) + .setSchema(new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER"))))); + Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryQuerySourceInitSplit"); + + String jobIdToken = "testJobIdToken"; + String query = FakeBigQueryServices.encodeQuery(expected); + String extractDestinationDir = baseDir.toString(); BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( - StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"), + StaticValueProvider.of(jobIdToken), StaticValueProvider.of(query), StaticValueProvider.of(destinationTable), true /* flattenResults */, true /* useLegacySql */, extractDestinationDir, fakeBqServices); - List<TableRow> expected = ImmutableList.of( - new TableRow().set("name", "a").set("number", "1"), - new TableRow().set("name", "b").set("number", "2"), - new TableRow().set("name", "c").set("number", "3")); - PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation(extractDestinationDir); TableReference queryTable = new TableReference() - .setProjectId("testproject") - .setDatasetId("testDataset") - .setTableId("testTable"); - // when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any())) - // .thenReturn(new JobStatistics().setQuery( - // new JobStatistics2() - // .setTotalBytesProcessed(100L) - // .setReferencedTables(ImmutableList.of(queryTable)))); - fakeJobService.expectDryRunQuery("testproject", "query", + .setProjectId("project") + .setDatasetId("data_set") + .setTableId("table_name"); + + fakeJobService.expectDryRunQuery("project", query, new JobStatistics().setQuery( new JobStatistics2() .setTotalBytesProcessed(100L) .setReferencedTables(ImmutableList.of(queryTable)))); - // when(mockDatasetService.getTable(eq(queryTable))) - // .thenReturn(new Table().setSchema(new TableSchema())); - // when(mockDatasetService.getTable(eq(destinationTable))) - // .thenReturn(new Table().setSchema(new TableSchema())); - IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */); - when(mockIOChannelFactory.resolve(anyString(), anyString())) - .thenReturn("mock://tempLocation/output"); - //when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt())) - // .thenReturn(extractJob); - - Assert.assertThat( - SourceTestUtils.readFromSource(bqSource, options), - CoreMatchers.is(expected)); + List<TableRow> read = SourceTestUtils.readFromSource(bqSource, options); + assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class))); SourceTestUtils.assertSplitAtFractionBehavior( bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); + List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options); - assertEquals(1, sources.size()); + assertEquals(2, sources.size()); BoundedSource<TableRow> actual = sources.get(0); assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); - - /* - Mockito.verify(mockJobService) - .startQueryJob( - Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any()); - Mockito.verify(mockJobService) - .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any()); - Mockito.verify(mockDatasetService) - .createDataset(anyString(), anyString(), anyString(), anyString()); - ArgumentCaptor<JobConfigurationQuery> queryConfigArg = - ArgumentCaptor.forClass(JobConfigurationQuery.class); - Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture()); - assertEquals(true, queryConfigArg.getValue().getFlattenResults()); - assertEquals(true, queryConfigArg.getValue().getUseLegacySql());*/ } @Test @@ -1402,68 +1386,60 @@ public class BigQueryIOTest implements Serializable { extractJob.setStatus(new JobStatus()) .setStatistics(extractJobStats); + FakeDatasetService datasetService = new FakeDatasetService(); + FakeJobService jobService = new FakeJobService(); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(new FakeJobService()) - .withDatasetService(mockDatasetService) - .readerReturns( - toJsonString(new TableRow().set("name", "a").set("number", "1")), - toJsonString(new TableRow().set("name", "b").set("number", "2")), - toJsonString(new TableRow().set("name", "c").set("number", "3"))); + .withJobService(jobService) + .withDatasetService(datasetService); - String jobIdToken = "testJobIdToken"; - String extractDestinationDir = "mock://tempLocation"; TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name"); + List<TableRow> expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", 1L), + new TableRow().set("name", "b").set("number", 2L), + new TableRow().set("name", "c").set("number", 3L), + new TableRow().set("name", "d").set("number", 4L), + new TableRow().set("name", "e").set("number", 5L), + new TableRow().set("name", "f").set("number", 6L)); + datasetService.createDataset(destinationTable.getProjectId(), destinationTable.getDatasetId(), + "", ""); + Table table = new Table() + .setTableReference(destinationTable) + .setSchema(new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))); + datasetService.createTable(table); + + String query = FakeBigQueryServices.encodeQuery(expected); + jobService.expectDryRunQuery("project", query, + new JobStatistics().setQuery( + new JobStatistics2() + .setTotalBytesProcessed(100L) + .setReferencedTables(ImmutableList.of(table.getTableReference())))); + + Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryNoTableQuerySourceInitSplit"); + String jobIdToken = "testJobIdToken"; BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( - StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"), + StaticValueProvider.of(jobIdToken), + StaticValueProvider.of(query), StaticValueProvider.of(destinationTable), - true /* flattenResults */, true /* useLegacySql */, - extractDestinationDir, fakeBqServices); + true /* flattenResults */, true /* useLegacySql */, baseDir.toString(), fakeBqServices); - List<TableRow> expected = ImmutableList.of( - new TableRow().set("name", "a").set("number", "1"), - new TableRow().set("name", "b").set("number", "2"), - new TableRow().set("name", "c").set("number", "3")); - PipelineOptions options = PipelineOptionsFactory.create(); - options.setTempLocation(extractDestinationDir); - - /* - when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any())) - .thenReturn(new JobStatistics().setQuery( - new JobStatistics2() - .setTotalBytesProcessed(100L))); - when(mockDatasetService.getTable(eq(destinationTable))) - .thenReturn(new Table().setSchema(new TableSchema())); - IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true); - when(mockIOChannelFactory.resolve(anyString(), anyString())) - .thenReturn("mock://tempLocation/output"); - when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt())) - .thenReturn(extractJob);*/ - Assert.assertThat( - SourceTestUtils.readFromSource(bqSource, options), - CoreMatchers.is(expected)); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setTempLocation(baseDir.toString()); + List<TableRow> read = convertBigDecimaslToLong( + SourceTestUtils.readFromSource(bqSource, options)); + assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class))); SourceTestUtils.assertSplitAtFractionBehavior( bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options); - assertEquals(1, sources.size()); + assertEquals(2, sources.size()); BoundedSource<TableRow> actual = sources.get(0); assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); - - /* - Mockito.verify(Service) - .startQueryJob( - Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any()); - Mockito.verify(mockJobService) - .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any()); - Mockito.verify(mockDatasetService) - .createDataset(anyString(), anyString(), anyString(), anyString()); - ArgumentCaptor<JobConfigurationQuery> queryConfigArg = - ArgumentCaptor.forClass(JobConfigurationQuery.class); - Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture()); - assertEquals(true, queryConfigArg.getValue().getFlattenResults()); - assertEquals(true, queryConfigArg.getValue().getUseLegacySql());*/ } @Test @@ -1604,12 +1580,27 @@ public class BigQueryIOTest implements Serializable { throws Exception { p.enableAbandonedNodeEnforcement(false); + // In the case where a static destination is specified (i.e. not through a dynamic table + // function) and there is no input data, WritePartition will generate an empty table. This + // code is to test that path. + TableReference singletonReference = new TableReference() + .setProjectId("projectid") + .setDatasetId("dataset") + .setTableId("table"); + String singletonDescription = "singleton"; + boolean isSingleton = numTables == 1 && numFilesPerTable == 0; + List<ShardedKey<TableDestination>> expectedPartitions = Lists.newArrayList(); - for (int i = 0; i < numTables; ++i) { - for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) { - String tableName = String.format("project-id:dataset-id.tables%05d", i); - TableDestination destination = new TableDestination(tableName, tableName); - expectedPartitions.add(ShardedKey.of(destination, j)); + if (isSingleton) { + expectedPartitions.add(ShardedKey.of( + new TableDestination(singletonReference, singletonDescription), 1)); + } else { + for (int i = 0; i < numTables; ++i) { + for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) { + String tableName = String.format("project-id:dataset-id.tables%05d", i); + TableDestination destination = new TableDestination(tableName, tableName); + expectedPartitions.add(ShardedKey.of(destination, j)); + } } } @@ -1642,11 +1633,7 @@ public class BigQueryIOTest implements Serializable { WriteBundlesToFiles.ResultCoder.of()); ValueProvider<String> singletonTable = null; - if (numFilesPerTable == 0 && numTables == 1) { - TableReference singletonReference = new TableReference() - .setProjectId("projectid") - .setDatasetId("dataset") - .setTableId("table"); + if (isSingleton) { singletonTable = StaticValueProvider.of(BigQueryHelpers.toJsonString(singletonReference)); } WritePartition writePartition = @@ -1680,12 +1667,10 @@ public class BigQueryIOTest implements Serializable { tableFilesResult.addAll(partition.getValue()); } - assertEquals(expectedPartitions.size(), partitionsResult.size()); + assertThat(partitionsResult, + containsInAnyOrder(Iterables.toArray(expectedPartitions, ShardedKey.class))); - // assertThat(partitionsResult, - // containsInAnyOrder(Iterables.toArray(expectedPartitions, ShardedKey.class))); - - if (numFilesPerTable == 0 && numTables == 1) { + if (isSingleton) { assertEquals(1, filesPerTableResult.size()); List<String> singletonFiles = filesPerTableResult.values().iterator().next(); assertTrue(Files.exists(Paths.get(singletonFiles.get(0)))); @@ -1700,15 +1685,11 @@ public class BigQueryIOTest implements Serializable { public void testWriteTables() throws Exception { p.enableAbandonedNodeEnforcement(false); + FakeDatasetService datasetService = new FakeDatasetService(); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) - // .startJobReturns("done", "done", "done", "done", "done", "done", "done", "done", - // "done", "done") - // .pollJobReturns(Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED, - // Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED, - // Status.SUCCEEDED, Status.SUCCEEDED)) - .withDatasetService(mockDatasetService); - + .withDatasetService(datasetService); + datasetService.createDataset("project-id", "dataset-id", "", ""); long numTables = 3; long numPartitions = 3; long numFilesPerPartition = 10; @@ -1716,6 +1697,8 @@ public class BigQueryIOTest implements Serializable { String tempFilePrefix = "tempFilePrefix"; Map<TableDestination, List<String>> expectedTempTables = Maps.newHashMap(); + Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables"); + List<KV<ShardedKey<TableDestination>, Iterable<List<String>>>> partitions = Lists.newArrayList(); for (int i = 0; i < numTables; ++i) { @@ -1726,7 +1709,16 @@ public class BigQueryIOTest implements Serializable { jobIdToken + "_0x%08x_%05d", tableDestination.hashCode(), j); List<String> filesPerPartition = Lists.newArrayList(); for (int k = 0; k < numFilesPerPartition; ++k) { - filesPerPartition.add(String.format("files0x%08x_%05d", tableDestination.hashCode(), k)); + String filename = Paths.get(baseDir.toString(), + String.format("files0x%08x_%05d", tempTableId.hashCode(), k)).toString(); + try (WritableByteChannel channel = IOChannelUtils.create(filename, MimeTypes.TEXT)) { + try (OutputStream output = Channels.newOutputStream(channel)) { + TableRow tableRow = new TableRow().set("name", tableName); + TableRowJsonCoder.of().encode(tableRow, output, Context.OUTER); + output.write("\n".getBytes(StandardCharsets.UTF_8)); + } + } + filesPerPartition.add(filename); } partitions.add(KV.of(ShardedKey.of(tableDestination, j), (Iterable<List<String>>) Collections.singleton(filesPerPartition))); @@ -1814,25 +1806,45 @@ public class BigQueryIOTest implements Serializable { public void testWriteRename() throws Exception { p.enableAbandonedNodeEnforcement(false); + FakeDatasetService datasetService = new FakeDatasetService(); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) - // .startJobReturns("done", "done") - // .pollJobReturns(Status.FAILED, Status.SUCCEEDED)) - .withDatasetService(mockDatasetService); + .withDatasetService(datasetService); + datasetService.createDataset("project-id", "dataset-id", "", ""); - int numFinalTables = 3; - int numTempTables = 3; + final int numFinalTables = 3; + final int numTempTablesPerFinalTable = 3; + final int numRecordsPerTempTable = 10; + + Map<TableDestination, List<TableRow>> expectedRowsPerTable = Maps.newHashMap(); String jobIdToken = "jobIdToken"; - String jsonTable = "{}"; Map<TableDestination, Iterable<String>> tempTables = Maps.newHashMap(); for (int i = 0; i < numFinalTables; ++i) { String tableName = "project-id:dataset-id.table_" + i; - TableDestination tableDestination = new TableDestination(tableName, tableName); + TableDestination tableDestination = new TableDestination( + tableName, "table_" + i + "_desc"); List<String> tables = Lists.newArrayList(); tempTables.put(tableDestination, tables); - for (int j = 0; i < numTempTables; ++i) { - tables.add(String.format( - "{\"project-id:dataset-id.tableId\":\"%s_%05d_%05d\"}", jobIdToken, i, j)); + + List<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination); + if (expectedRows == null) { + expectedRows = Lists.newArrayList(); + expectedRowsPerTable.put(tableDestination, expectedRows); + } + for (int j = 0; i < numTempTablesPerFinalTable; ++i) { + TableReference tempTable = new TableReference() + .setProjectId("project-id") + .setDatasetId("dataset-id") + .setTableId(String.format("%s_%05d_%05d", jobIdToken, i, j)); + datasetService.createTable(new Table().setTableReference(tempTable)); + + List<TableRow> rows = Lists.newArrayList(); + for (int k = 0; k < numRecordsPerTempTable; ++k) { + rows.add(new TableRow().set("number", j * numTempTablesPerFinalTable + k)); + } + datasetService.insertAll(tempTable, rows, null); + expectedRows.addAll(rows); + tables.add(BigQueryHelpers.toJsonString(tempTable)); } } @@ -1857,37 +1869,52 @@ public class BigQueryIOTest implements Serializable { tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables); tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); tester.processElement(null); + + for (Map.Entry<TableDestination, Iterable<String>> entry : tempTables.entrySet()) { + TableDestination tableDestination = entry.getKey(); + TableReference tableReference = tableDestination.getTableReference(); + Table table = checkNotNull(datasetService.getTable(tableReference)); + assertEquals(tableReference.getTableId() + "_desc", tableDestination.getTableDescription()); + + List<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination); + assertThat(datasetService.getAllRows(tableReference.getProjectId(), + tableReference.getDatasetId(), tableReference.getTableId()), + containsInAnyOrder(Iterables.toArray(expectedRows, TableRow.class))); + + // Temp tables should be deleted. + for (String tempTableJson : entry.getValue()) { + TableReference tempTable = BigQueryHelpers.fromJsonString( + tempTableJson, TableReference.class); + assertEquals(null, datasetService.getTable(tempTable)); + } + } } @Test public void testRemoveTemporaryTables() throws Exception { - String projectId = "someproject"; - String datasetId = "somedataset"; - List<String> tables = Lists.newArrayList("table1", "table2", "table3"); + FakeDatasetService datasetService = new FakeDatasetService(); + String projectId = "project"; + String datasetId = "dataset"; + datasetService.createDataset(projectId, datasetId, "", ""); List<TableReference> tableRefs = Lists.newArrayList( - BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, - tables.get(0))), - BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, - tables.get(1))), - BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, - tables.get(2)))); + BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table1")), + BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table2")), + BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table3"))); + for (TableReference tableRef : tableRefs) { + datasetService.createTable(new Table().setTableReference(tableRef)); + } - doThrow(new IOException("Unable to delete table")) - .when(mockDatasetService).deleteTable(tableRefs.get(0)); - doNothing().when(mockDatasetService).deleteTable(tableRefs.get(1)); - doNothing().when(mockDatasetService).deleteTable(tableRefs.get(2)); + // Add one more table to delete that does not actually exist. + tableRefs.add( + BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table4"))); - WriteRename.removeTemporaryTables(mockDatasetService, tableRefs); + WriteRename.removeTemporaryTables(datasetService, tableRefs); for (TableReference ref : tableRefs) { loggedWriteRename.verifyDebug("Deleting table " + toJsonString(ref)); + checkState(datasetService.getTable(ref) == null, + "Table " + ref + " was not deleted!"); } - loggedWriteRename.verifyWarn("Failed to delete the table " - + toJsonString(tableRefs.get(0))); - loggedWriteRename.verifyNotLogged("Failed to delete the table " - + toJsonString(tableRefs.get(1))); - loggedWriteRename.verifyNotLogged("Failed to delete the table " - + toJsonString(tableRefs.get(2))); } /** Test options. **/ @@ -1957,43 +1984,6 @@ public class BigQueryIOTest implements Serializable { }}).length); } - private class WriteExtractFiles implements SerializableFunction<GenericJson, Void> { - private final SerializableFunction<Void, Schema> schemaGenerator; - private final Collection<Map<String, Object>> records; - - private WriteExtractFiles( - SerializableFunction<Void, Schema> schemaGenerator, - Collection<Map<String, Object>> records) { - this.schemaGenerator = schemaGenerator; - this.records = records; - } - - @Override - public Void apply(GenericJson input) { - List<String> destinations = (List<String>) input.get("destinationUris"); - for (String destination : destinations) { - String newDest = destination.replace("*", "000000000000"); - Schema schema = schemaGenerator.apply(null); - try (WritableByteChannel channel = IOChannelUtils.create(newDest, MimeTypes.BINARY); - DataFileWriter<GenericRecord> tableRowWriter = - new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(schema)) - .create(schema, Channels.newOutputStream(channel))) { - for (Map<String, Object> record : records) { - GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(schema); - for (Map.Entry<String, Object> field : record.entrySet()) { - genericRecordBuilder.set(field.getKey(), field.getValue()); - } - tableRowWriter.append(genericRecordBuilder.build()); - } - } catch (IOException e) { - throw new IllegalStateException( - String.format("Could not create destination for extract job %s", destination), e); - } - } - return null; - } - } - @Test public void testShardedKeyCoderIsSerializableWithWellKnownCoderType() { CoderProperties.coderSerializable(ShardedKeyCoder.of(GlobalWindow.Coder.INSTANCE)); @@ -2013,4 +2003,19 @@ public class BigQueryIOTest implements Serializable { TableRowInfoCoder.of()), IntervalWindow.getCoder())); } + + List<TableRow> convertBigDecimaslToLong(List<TableRow> toConvert) { + // The numbers come back as BigDecimal objects after JSON serialization. Change them back to + // longs so that we can assert the output. + List<TableRow> converted = Lists.newArrayList(); + for (TableRow entry : toConvert) { + TableRow convertedEntry = entry.clone(); + Object num = convertedEntry.get("number"); + if (num instanceof BigDecimal) { + convertedEntry.set("number", ((BigDecimal) num).longValue()); + } + converted.add(convertedEntry); + } + return converted; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java index ed3ab37..6dfd9d7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java @@ -1,39 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString; import static org.junit.Assert.assertEquals; +import com.google.api.client.util.Base64; import com.google.api.services.bigquery.model.JobConfigurationQuery; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; +import com.google.common.collect.Lists; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.List; import java.util.NoSuchElementException; + +import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.options.BigQueryOptions; /** - * Created by relax on 3/30/17. + * A fake implementation of BigQuery's query service.. */ class FakeBigQueryServices implements BigQueryServices { - private String[] jsonTableRowReturns = new String[0]; private JobService jobService; - private DatasetService datasetService; + private FakeDatasetService datasetService; - public FakeBigQueryServices withJobService(JobService jobService) { + FakeBigQueryServices withJobService(JobService jobService) { this.jobService = jobService; return this; } - public FakeBigQueryServices withDatasetService(DatasetService datasetService) { + FakeBigQueryServices withDatasetService(FakeDatasetService datasetService) { this.datasetService = datasetService; return this; } - public FakeBigQueryServices readerReturns(String... jsonTableRowReturns) { - this.jsonTableRowReturns = jsonTableRowReturns; - return this; - } - @Override public JobService getJobService(BigQueryOptions bqOptions) { return jobService; @@ -45,26 +65,58 @@ class FakeBigQueryServices implements BigQueryServices { } @Override - public BigQueryJsonReader getReaderFromTable( - BigQueryOptions bqOptions, TableReference tableRef) { - return new FakeBigQueryReader(jsonTableRowReturns); + public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef) { + try { + List<TableRow> rows = datasetService.getAllRows( + tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()); + return new FakeBigQueryReader(rows); + } catch (Exception e) { + return null; + } } @Override public BigQueryJsonReader getReaderFromQuery( BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { - return new FakeBigQueryReader(jsonTableRowReturns); + try { + List<TableRow> rows = rowsFromEncodedQuery(queryConfig.getQuery()); + return new FakeBigQueryReader(rows); + } catch (IOException e) { + return null; + } + } + + static List<TableRow> rowsFromEncodedQuery(String query) throws IOException { + ListCoder<TableRow> listCoder = ListCoder.of(TableRowJsonCoder.of()); + ByteArrayInputStream input = new ByteArrayInputStream(Base64.decodeBase64(query)); + List<TableRow> rows = listCoder.decode(input, Context.OUTER); + for (TableRow row : rows) { + convertNumbers(row); + } + return rows; + } + + static String encodeQuery(List<TableRow> rows) throws IOException { + ListCoder<TableRow> listCoder = ListCoder.of(TableRowJsonCoder.of()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + listCoder.encode(rows, output, Context.OUTER); + return Base64.encodeBase64String(output.toByteArray()); } private static class FakeBigQueryReader implements BigQueryJsonReader { private static final int UNSTARTED = -1; private static final int CLOSED = Integer.MAX_VALUE; - private String[] jsonTableRowReturns; + private List<byte[]> serializedTableRowReturns; private int currIndex; - FakeBigQueryReader(String[] jsonTableRowReturns) { - this.jsonTableRowReturns = jsonTableRowReturns; + FakeBigQueryReader(List<TableRow> tableRowReturns) throws IOException { + this.serializedTableRowReturns = Lists.newArrayListWithExpectedSize(tableRowReturns.size()); + for (TableRow tableRow : tableRowReturns) { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + TableRowJsonCoder.of().encode(tableRow, output, Context.OUTER); + serializedTableRowReturns.add(output.toByteArray()); + } this.currIndex = UNSTARTED; } @@ -72,20 +124,27 @@ class FakeBigQueryServices implements BigQueryServices { public boolean start() throws IOException { assertEquals(UNSTARTED, currIndex); currIndex = 0; - return currIndex < jsonTableRowReturns.length; + return currIndex < serializedTableRowReturns.size(); } @Override public boolean advance() throws IOException { - return ++currIndex < jsonTableRowReturns.length; + return ++currIndex < serializedTableRowReturns.size(); } @Override public TableRow getCurrent() throws NoSuchElementException { - if (currIndex >= jsonTableRowReturns.length) { + if (currIndex >= serializedTableRowReturns.size()) { throw new NoSuchElementException(); } - return fromJsonString(jsonTableRowReturns[currIndex], TableRow.class); + + ByteArrayInputStream input = new ByteArrayInputStream( + serializedTableRowReturns.get(currIndex)); + try { + return convertNumbers(TableRowJsonCoder.of().decode(input, Context.OUTER)); + } catch (IOException e) { + return null; + } } @Override @@ -93,4 +152,15 @@ class FakeBigQueryServices implements BigQueryServices { currIndex = CLOSED; } } + + + // Longs tend to get converted back to Integers due to JSON serialization. Convert them back. + static TableRow convertNumbers(TableRow tableRow) { + for (TableRow.Entry entry : tableRow.entrySet()) { + if (entry.getValue() instanceof Integer) { + entry.setValue(new Long((Integer) entry.getValue())); + } + } + return tableRow; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java index 9b2cf63..5103adb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java @@ -1,9 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.beam.sdk.io.gcp.bigquery; -import static com.google.common.base.Preconditions.checkNotNull; import static org.junit.Assert.assertEquals; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpHeaders; import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetReference; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; @@ -24,13 +44,13 @@ class FakeDatasetService implements DatasetService, Serializable { throws InterruptedException, IOException { synchronized (BigQueryIOTest.tables) { Map<String, TableContainer> dataset = - checkNotNull( - BigQueryIOTest.tables.get(tableRef.getProjectId(), tableRef.getDatasetId()), - "Tried to get a dataset %s:%s from %s, but no such dataset was set", - tableRef.getProjectId(), - tableRef.getDatasetId(), - tableRef.getTableId(), - FakeDatasetService.class.getSimpleName()); + BigQueryIOTest.tables.get(tableRef.getProjectId(), tableRef.getDatasetId()); + if (dataset == null) { + throwNotFound( + "Tried to get a dataset %s:%s from, but no such dataset was set", + tableRef.getProjectId(), + tableRef.getDatasetId()); + } TableContainer tableContainer = dataset.get(tableRef.getTableId()); return tableContainer == null ? null : tableContainer.getTable(); } @@ -44,27 +64,40 @@ class FakeDatasetService implements DatasetService, Serializable { } private TableContainer getTableContainer(String projectId, String datasetId, String tableId) - throws InterruptedException, IOException { - synchronized (BigQueryIOTest.tables) { - Map<String, TableContainer> dataset = - checkNotNull( - BigQueryIOTest.tables.get(projectId, datasetId), - "Tried to get a dataset %s:%s from %s, but no such dataset was set", - projectId, - datasetId, - FakeDatasetService.class.getSimpleName()); - return checkNotNull(dataset.get(tableId), - "Tried to get a table %s:%s.%s from %s, but no such table was set", - projectId, - datasetId, - tableId, - FakeDatasetService.class.getSimpleName()); - } + throws InterruptedException, IOException { + synchronized (BigQueryIOTest.tables) { + Map<String, TableContainer> dataset = BigQueryIOTest.tables.get(projectId, datasetId); + if (dataset == null) { + throwNotFound( + "Tried to get a dataset %s:%s, but no such dataset was set", + projectId, + datasetId); + } + TableContainer tableContainer = dataset.get(tableId); + if (tableContainer == null) { + throwNotFound( + "Tried to get a table %s:%s.%s, but no such table was set", + projectId, + datasetId, + tableId); + } + return tableContainer; + } } @Override public void deleteTable(TableReference tableRef) throws IOException, InterruptedException { - throw new UnsupportedOperationException("Unsupported"); + synchronized (BigQueryIOTest.tables) { + Map<String, TableContainer> dataset = + BigQueryIOTest.tables.get(tableRef.getProjectId(), tableRef.getDatasetId()); + if (dataset == null) { + throwNotFound( + "Tried to get a dataset %s:%s, but no such table was set", + tableRef.getProjectId(), + tableRef.getDatasetId()); + } + dataset.remove(tableRef.getTableId()); + } } @@ -73,13 +106,13 @@ class FakeDatasetService implements DatasetService, Serializable { TableReference tableReference = table.getTableReference(); synchronized (BigQueryIOTest.tables) { Map<String, TableContainer> dataset = - checkNotNull( - BigQueryIOTest.tables.get(tableReference.getProjectId(), - tableReference.getDatasetId()), - "Tried to get a dataset %s:%s from %s, but no such table was set", - tableReference.getProjectId(), - tableReference.getDatasetId(), - FakeDatasetService.class.getSimpleName()); + BigQueryIOTest.tables.get(tableReference.getProjectId(), tableReference.getDatasetId()); + if (dataset == null) { + throwNotFound( + "Tried to get a dataset %s:%s, but no such table was set", + tableReference.getProjectId(), + tableReference.getDatasetId()); + } TableContainer tableContainer = dataset.get(tableReference.getTableId()); if (tableContainer == null) { tableContainer = new TableContainer(table); @@ -98,7 +131,16 @@ class FakeDatasetService implements DatasetService, Serializable { @Override public Dataset getDataset( String projectId, String datasetId) throws IOException, InterruptedException { - throw new UnsupportedOperationException("Unsupported"); + synchronized (BigQueryIOTest.tables) { + Map<String, TableContainer> dataset = BigQueryIOTest.tables.get(projectId, datasetId); + if (dataset == null) { + throwNotFound("Tried to get a dataset %s:%s, but no such table was set", + projectId, datasetId); + } + return new Dataset().setDatasetReference(new DatasetReference() + .setDatasetId(datasetId) + .setProjectId(projectId)); + } } @Override @@ -117,7 +159,9 @@ class FakeDatasetService implements DatasetService, Serializable { @Override public void deleteDataset(String projectId, String datasetId) throws IOException, InterruptedException { - throw new UnsupportedOperationException("Unsupported"); + synchronized (BigQueryIOTest.tables) { + BigQueryIOTest.tables.remove(projectId, datasetId); + } } @Override @@ -138,8 +182,7 @@ class FakeDatasetService implements DatasetService, Serializable { TableContainer tableContainer = getTableContainer( ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); for (int i = 0; i < rowList.size(); ++i) { - tableContainer.addRow(rowList.get(i), insertIdList.get(i)); - dataSize += rowList.get(i).toString().length(); + dataSize += tableContainer.addRow(rowList.get(i), insertIdList.get(i)); } return dataSize; } @@ -150,23 +193,16 @@ class FakeDatasetService implements DatasetService, Serializable { @Nullable String tableDescription) throws IOException, InterruptedException { synchronized (BigQueryIOTest.tables) { - Map<String, TableContainer> dataset = - checkNotNull( - BigQueryIOTest.tables.get(tableReference.getProjectId(), - tableReference.getDatasetId()), - "Tried to get a dataset %s:%s from %s, but no such dataset was set", - tableReference.getProjectId(), - tableReference.getDatasetId(), - tableReference.getTableId(), - FakeDatasetService.class.getSimpleName()); - TableContainer tableContainer = checkNotNull(dataset.get(tableReference.getTableId()), - "Tried to patch a table %s:%s.%s from %s, but no such table was set", - tableReference.getProjectId(), - tableReference.getDatasetId(), - tableReference.getTableId(), - FakeDatasetService.class.getSimpleName()); + TableContainer tableContainer = getTableContainer(tableReference.getProjectId(), + tableReference.getDatasetId(), tableReference.getTableId()); tableContainer.getTable().setDescription(tableDescription); return tableContainer.getTable(); } } + + void throwNotFound(String format, Object... args) throws IOException { + throw new IOException( + new GoogleJsonResponseException.Builder(404, + String.format(format, args), new HttpHeaders()).build()); + } }
