http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/987350b7/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index 7998fc7..d9f53af 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -17,30 +17,51 @@
  */
 package org.apache.beam.sdk.io;
 
+import static org.apache.beam.sdk.io.BigQueryIO.fromJsonString;
+import static org.apache.beam.sdk.io.BigQueryIO.toJsonString;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.when;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.io.BigQueryIO.BigQueryQuerySource;
+import org.apache.beam.sdk.io.BigQueryIO.BigQueryTableSource;
+import org.apache.beam.sdk.io.BigQueryIO.PassThroughThenCleanup;
+import 
org.apache.beam.sdk.io.BigQueryIO.PassThroughThenCleanup.CleanupOperation;
 import org.apache.beam.sdk.io.BigQueryIO.Status;
+import org.apache.beam.sdk.io.BigQueryIO.TransformingSource;
 import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.BigQueryServices;
+import org.apache.beam.sdk.util.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.util.BigQueryServices.JobService;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.values.PCollection;
 
 import com.google.api.client.util.Data;
 import com.google.api.services.bigquery.model.ErrorProto;
@@ -48,14 +69,21 @@ import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.JobStatistics2;
+import com.google.api.services.bigquery.model.JobStatistics4;
 import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 
+import org.hamcrest.CoreMatchers;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Before;
@@ -67,18 +95,24 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
 
 /**
  * Tests for BigQueryIO.
  */
 @RunWith(JUnit4.class)
-public class BigQueryIOTest {
+public class BigQueryIOTest implements Serializable {
 
   // Status.UNKNOWN maps to null
   private static final Map<Status, Job> JOB_STATUS_MAP = ImmutableMap.of(
@@ -87,111 +121,192 @@ public class BigQueryIOTest {
 
   private static class FakeBigQueryServices implements BigQueryServices {
 
-    private Object[] startJobReturns;
-    private Object[] pollJobStatusReturns;
+    private String[] jsonTableRowReturns = new String[0];
+    private JobService jobService;
+    private DatasetService datasetService;
 
-    /**
-     * Sets the return values for the mock {@link JobService#startLoadJob}.
-     *
-     * <p>Throws if the {@link Object} is a {@link Exception}, returns 
otherwise.
-     */
-    private FakeBigQueryServices startLoadJobReturns(Object... 
startLoadJobReturns) {
-      this.startJobReturns = startLoadJobReturns;
+    public FakeBigQueryServices withJobService(JobService jobService) {
+      this.jobService = jobService;
       return this;
     }
 
-    /**
-     * Sets the return values for the mock {@link JobService#pollJobStatus}.
-     *
-     * <p>Throws if the {@link Object} is a {@link Exception}, returns 
otherwise.
-     */
-    private FakeBigQueryServices pollJobStatusReturns(Object... 
pollJobStatusReturns) {
-      this.pollJobStatusReturns = pollJobStatusReturns;
+    public FakeBigQueryServices withDatasetService(DatasetService 
datasetService) {
+      this.datasetService = datasetService;
+      return this;
+    }
+
+    public FakeBigQueryServices readerReturns(String... jsonTableRowReturns) {
+      this.jsonTableRowReturns = jsonTableRowReturns;
       return this;
     }
 
     @Override
     public JobService getJobService(BigQueryOptions bqOptions) {
-      return new FakeLoadService(startJobReturns, pollJobStatusReturns);
+      return jobService;
     }
 
-    private static class FakeLoadService implements 
BigQueryServices.JobService {
+    @Override
+    public DatasetService getDatasetService(BigQueryOptions bqOptions) {
+      return datasetService;
+    }
+
+    @Override
+    public BigQueryJsonReader getReaderFromTable(
+        BigQueryOptions bqOptions, TableReference tableRef) {
+      return new FakeBigQueryReader(jsonTableRowReturns);
+    }
+
+    @Override
+    public BigQueryJsonReader getReaderFromQuery(
+        BigQueryOptions bqOptions, String query, String projectId, @Nullable 
Boolean flatten) {
+      return new FakeBigQueryReader(jsonTableRowReturns);
+    }
 
-      private Object[] startJobReturns;
-      private Object[] pollJobStatusReturns;
-      private int startLoadJobCallsCount;
-      private int pollJobStatusCallsCount;
+    private static class FakeBigQueryReader implements BigQueryJsonReader {
+      private static final int UNSTARTED = -1;
+      private static final int CLOSED = Integer.MAX_VALUE;
 
-      public FakeLoadService(Object[] startLoadJobReturns, Object[] 
pollJobStatusReturns) {
-        this.startJobReturns = startLoadJobReturns;
-        this.pollJobStatusReturns = pollJobStatusReturns;
-        this.startLoadJobCallsCount = 0;
-        this.pollJobStatusCallsCount = 0;
+      private String[] jsonTableRowReturns;
+      private int currIndex;
+
+      FakeBigQueryReader(String[] jsonTableRowReturns) {
+        this.jsonTableRowReturns = jsonTableRowReturns;
+        this.currIndex = UNSTARTED;
       }
 
       @Override
-      public void startLoadJob(String jobId, JobConfigurationLoad loadConfig)
-          throws InterruptedException, IOException {
-        startJob();
+      public boolean start() throws IOException {
+        assertEquals(UNSTARTED, currIndex);
+        currIndex = 0;
+        return currIndex < jsonTableRowReturns.length;
       }
 
       @Override
-      public void startExtractJob(String jobId, JobConfigurationExtract 
extractConfig)
-          throws InterruptedException, IOException {
-        startJob();
+      public boolean advance() throws IOException {
+        return ++currIndex < jsonTableRowReturns.length;
       }
 
       @Override
-      public void startQueryJob(String jobId, JobConfigurationQuery query, 
boolean dryRun)
-          throws IOException, InterruptedException {
-        startJob();
+      public TableRow getCurrent() throws NoSuchElementException {
+        if (currIndex >= jsonTableRowReturns.length) {
+          throw new NoSuchElementException();
+        }
+        return fromJsonString(jsonTableRowReturns[currIndex], TableRow.class);
       }
 
       @Override
-      public Job pollJob(String projectId, String jobId, int maxAttemps)
-          throws InterruptedException {
-        if (pollJobStatusCallsCount < pollJobStatusReturns.length) {
-          Object ret = pollJobStatusReturns[pollJobStatusCallsCount++];
-          if (ret instanceof Status) {
-            return JOB_STATUS_MAP.get(ret);
-          } else if (ret instanceof InterruptedException) {
-            throw (InterruptedException) ret;
-          } else {
-            throw new RuntimeException("Unexpected return type: " + 
ret.getClass());
-          }
+      public void close() throws IOException {
+        currIndex = CLOSED;
+      }
+    }
+  }
+
+  private static class FakeJobService implements JobService, Serializable {
+
+    private Object[] startJobReturns;
+    private Object[] pollJobReturns;
+    // Both counts will be reset back to zeros after serialization.
+    // This is a work around for DoFn's verifyUnmodified check.
+    private transient int startJobCallsCount;
+    private transient int pollJobStatusCallsCount;
+
+    public FakeJobService() {
+      this.startJobReturns = new Object[0];
+      this.pollJobReturns = new Object[0];
+      this.startJobCallsCount = 0;
+      this.pollJobStatusCallsCount = 0;
+    }
+
+    /**
+     * Sets the return values to mock {@link JobService#startLoadJob},
+     * {@link JobService#startExtractJob} and {@link JobService#startQueryJob}.
+     *
+     * <p>Throws if the {@link Object} is a {@link Exception}, returns 
otherwise.
+     */
+    public FakeJobService startJobReturns(Object... startJobReturns) {
+      this.startJobReturns = startJobReturns;
+      return this;
+    }
+
+    /**
+     * Sets the return values to mock {@link JobService#pollJob}.
+     *
+     * <p>Throws if the {@link Object} is a {@link Exception}, returns 
otherwise.
+     */
+    public FakeJobService pollJobReturns(Object... pollJobReturns) {
+      this.pollJobReturns = pollJobReturns;
+      return this;
+    }
+
+    @Override
+    public void startLoadJob(JobReference jobRef, JobConfigurationLoad 
loadConfig)
+        throws InterruptedException, IOException {
+      startJob();
+    }
+
+    @Override
+    public void startExtractJob(JobReference jobRef, JobConfigurationExtract 
extractConfig)
+        throws InterruptedException, IOException {
+      startJob();
+    }
+
+    @Override
+    public void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
+        throws IOException, InterruptedException {
+      startJob();
+    }
+
+    @Override
+    public Job pollJob(JobReference jobRef, int maxAttempts)
+        throws InterruptedException {
+      if (pollJobStatusCallsCount < pollJobReturns.length) {
+        Object ret = pollJobReturns[pollJobStatusCallsCount++];
+        if (ret instanceof Job) {
+          return (Job) ret;
+        } else if (ret instanceof Status) {
+          return JOB_STATUS_MAP.get(ret);
+        } else if (ret instanceof InterruptedException) {
+          throw (InterruptedException) ret;
         } else {
-          throw new RuntimeException(
-              "Exceeded expected number of calls: " + 
pollJobStatusReturns.length);
+          throw new RuntimeException("Unexpected return type: " + 
ret.getClass());
         }
+      } else {
+        throw new RuntimeException(
+            "Exceeded expected number of calls: " + pollJobReturns.length);
       }
+    }
 
-      private void startJob() throws IOException, InterruptedException {
-        if (startLoadJobCallsCount < startJobReturns.length) {
-          Object ret = startJobReturns[startLoadJobCallsCount++];
-          if (ret instanceof IOException) {
-            throw (IOException) ret;
-          } else if (ret instanceof InterruptedException) {
-            throw (InterruptedException) ret;
-          } else {
-            return;
-          }
+    private void startJob() throws IOException, InterruptedException {
+      if (startJobCallsCount < startJobReturns.length) {
+        Object ret = startJobReturns[startJobCallsCount++];
+        if (ret instanceof IOException) {
+          throw (IOException) ret;
+        } else if (ret instanceof InterruptedException) {
+          throw (InterruptedException) ret;
         } else {
-          throw new RuntimeException(
-              "Exceeded expected number of calls: " + startJobReturns.length);
+          return;
         }
+      } else {
+        throw new RuntimeException(
+            "Exceeded expected number of calls: " + startJobReturns.length);
       }
     }
+
+    @Override
+    public JobStatistics dryRunQuery(String projectId, String query)
+        throws InterruptedException, IOException {
+      throw new UnsupportedOperationException();
+    }
   }
 
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-  @Rule public ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class);
-  @Rule
-  public TemporaryFolder testFolder = new TemporaryFolder();
-  @Mock
-  public BigQueryServices.JobService mockBqLoadService;
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+  @Rule public transient ExpectedLogs logged = 
ExpectedLogs.none(BigQueryIO.class);
+  @Rule public transient TemporaryFolder testFolder = new TemporaryFolder();
+  @Mock public transient BigQueryServices.JobService mockJobService;
+  @Mock private transient IOChannelFactory mockIOChannelFactory;
+  @Mock private transient DatasetService mockDatasetService;
 
-  private BigQueryOptions bqOptions;
+  private transient BigQueryOptions bqOptions;
 
   private void checkReadTableObject(
       BigQueryIO.Read.Bound bound, String project, String dataset, String 
table) {
@@ -205,16 +320,16 @@ public class BigQueryIOTest {
 
   private void checkReadTableObjectWithValidate(
       BigQueryIO.Read.Bound bound, String project, String dataset, String 
table, boolean validate) {
-    assertEquals(project, bound.table.getProjectId());
-    assertEquals(dataset, bound.table.getDatasetId());
-    assertEquals(table, bound.table.getTableId());
+    assertEquals(project, bound.getTable().getProjectId());
+    assertEquals(dataset, bound.getTable().getDatasetId());
+    assertEquals(table, bound.getTable().getTableId());
     assertNull(bound.query);
     assertEquals(validate, bound.getValidate());
   }
 
   private void checkReadQueryObjectWithValidate(
       BigQueryIO.Read.Bound bound, String query, boolean validate) {
-    assertNull(bound.table);
+    assertNull(bound.getTable());
     assertEquals(query, bound.query);
     assertEquals(validate, bound.getValidate());
   }
@@ -241,10 +356,10 @@ public class BigQueryIOTest {
   }
 
   @Before
-  public void setUp() {
+  public void setUp() throws IOException {
     bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
     bqOptions.setProject("defaultProject");
-    bqOptions.setTempLocation(testFolder.getRoot().getAbsolutePath() + 
"/BigQueryIOTest/");
+    
bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
     MockitoAnnotations.initMocks(this);
   }
@@ -315,11 +430,7 @@ public class BigQueryIOTest {
     thrown.expectMessage(
         Matchers.either(Matchers.containsString("Unable to confirm BigQuery 
dataset presence"))
             .or(Matchers.containsString("BigQuery dataset not found for 
table")));
-    try {
-      p.apply(BigQueryIO.Read.named("ReadMyTable").from(tableRef));
-    } finally {
-      Assert.assertEquals("someproject", tableRef.getProjectId());
-    }
+    p.apply(BigQueryIO.Read.named("ReadMyTable").from(tableRef));
   }
 
   @Test
@@ -364,10 +475,40 @@ public class BigQueryIOTest {
   }
 
   @Test
+  public void testReadFromTable() {
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(new FakeJobService()
+            .startJobReturns("done", "done")
+            .pollJobReturns(Status.UNKNOWN))
+        .readerReturns(
+            toJsonString(new TableRow().set("name", "a").set("number", 1)),
+            toJsonString(new TableRow().set("name", "b").set("number", 2)),
+            toJsonString(new TableRow().set("name", "c").set("number", 3)));
+
+    Pipeline p = TestPipeline.create(bqOptions);
+    PCollection<String> output = p
+        .apply(BigQueryIO.Read.from("foo.com:project:somedataset.sometable")
+            .withTestServices(fakeBqServices)
+            .withoutValidation())
+        .apply(ParDo.of(new DoFn<TableRow, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {
+            c.output((String) c.element().get("name"));
+          }
+        }));
+
+    PAssert.that(output)
+        .containsInAnyOrder(ImmutableList.of("a", "b", "c"));
+
+    p.run();
+  }
+
+  @Test
   public void testCustomSink() throws Exception {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .startLoadJobReturns("done", "done", "done")
-        .pollJobStatusReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED);
+        .withJobService(new FakeJobService()
+            .startJobReturns("done", "done", "done")
+            .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED));
 
     Pipeline p = TestPipeline.create(bqOptions);
     p.apply(Create.of(
@@ -398,8 +539,9 @@ public class BigQueryIOTest {
   @Test
   public void testCustomSinkUnknown() throws Exception {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .startLoadJobReturns("done", "done")
-        .pollJobStatusReturns(Status.FAILED, Status.UNKNOWN);
+        .withJobService(new FakeJobService()
+            .startJobReturns("done", "done")
+            .pollJobReturns(Status.FAILED, Status.UNKNOWN));
 
     Pipeline p = TestPipeline.create(bqOptions);
     p.apply(Create.of(
@@ -717,4 +859,240 @@ public class BigQueryIOTest {
         .apply(Create.<TableRow>of())
         .apply(BigQueryIO.Write.named("name"));
   }
+
+  @Test
+  public void testBigQueryTableSourceThroughJsonAPI() throws Exception {
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(mockJobService)
+        .readerReturns(
+            toJsonString(new TableRow().set("name", "a").set("number", "1")),
+            toJsonString(new TableRow().set("name", "b").set("number", "2")),
+            toJsonString(new TableRow().set("name", "c").set("number", "3")));
+
+    String jobIdToken = "testJobIdToken";
+    String jsonTable = 
toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name"));
+    String extractDestinationDir = "mock://tempLocation";
+    BoundedSource<TableRow> bqSource =
+        BigQueryTableSource.create(jobIdToken, jsonTable, 
extractDestinationDir, fakeBqServices);
+
+    List<TableRow> expected = ImmutableList.of(
+        new TableRow().set("name", "a").set("number", "1"),
+        new TableRow().set("name", "b").set("number", "2"),
+        new TableRow().set("name", "c").set("number", "3"));
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    Assert.assertThat(
+        SourceTestUtils.readFromSource(bqSource, options),
+        CoreMatchers.is(expected));
+    SourceTestUtils.assertSplitAtFractionBehavior(
+        bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, 
options);
+  }
+
+  @Test
+  public void testBigQueryTableSourceInitSplit() throws Exception {
+    Job extractJob = new Job();
+    JobStatistics jobStats = new JobStatistics();
+    JobStatistics4 extractStats = new JobStatistics4();
+    extractStats.setDestinationUriFileCounts(ImmutableList.of(1L));
+    jobStats.setExtract(extractStats);
+    extractJob.setStatus(new JobStatus())
+        .setStatistics(jobStats);
+
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(mockJobService)
+        .withDatasetService(mockDatasetService)
+        .readerReturns(
+            toJsonString(new TableRow().set("name", "a").set("number", "1")),
+            toJsonString(new TableRow().set("name", "b").set("number", "2")),
+            toJsonString(new TableRow().set("name", "c").set("number", "3")));
+
+    String jobIdToken = "testJobIdToken";
+    String jsonTable = 
toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name"));
+    String extractDestinationDir = "mock://tempLocation";
+    BoundedSource<TableRow> bqSource =
+        BigQueryTableSource.create(jobIdToken, jsonTable, 
extractDestinationDir, fakeBqServices);
+
+    List<TableRow> expected = ImmutableList.of(
+        new TableRow().set("name", "a").set("number", "1"),
+        new TableRow().set("name", "b").set("number", "2"),
+        new TableRow().set("name", "c").set("number", "3"));
+
+    when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
+        .thenReturn(extractJob);
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setTempLocation("mock://tempLocation");
+
+    IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+    when(mockIOChannelFactory.resolve(anyString(), anyString()))
+        .thenReturn("mock://tempLocation/output");
+    when(mockDatasetService.getTable(anyString(), anyString(), anyString()))
+        .thenReturn(new Table().setSchema(new TableSchema()));
+
+    Assert.assertThat(
+        SourceTestUtils.readFromSource(bqSource, options),
+        CoreMatchers.is(expected));
+    SourceTestUtils.assertSplitAtFractionBehavior(
+        bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, 
options);
+
+    List<? extends BoundedSource<TableRow>> sources = 
bqSource.splitIntoBundles(100, options);
+    assertEquals(1, sources.size());
+    BoundedSource<TableRow> actual = sources.get(0);
+    assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
+
+    Mockito.verify(mockJobService)
+        .startExtractJob(Mockito.<JobReference>any(), 
Mockito.<JobConfigurationExtract>any());
+  }
+
+  @Test
+  public void testBigQueryQuerySourceInitSplit() throws Exception {
+    TableReference dryRunTable = new TableReference();
+
+    Job queryJob = new Job();
+    JobStatistics queryJobStats = new JobStatistics();
+    JobStatistics2 queryStats = new JobStatistics2();
+    queryStats.setReferencedTables(ImmutableList.of(dryRunTable));
+    queryJobStats.setQuery(queryStats);
+    queryJob.setStatus(new JobStatus())
+        .setStatistics(queryJobStats);
+
+    Job extractJob = new Job();
+    JobStatistics extractJobStats = new JobStatistics();
+    JobStatistics4 extractStats = new JobStatistics4();
+    extractStats.setDestinationUriFileCounts(ImmutableList.of(1L));
+    extractJobStats.setExtract(extractStats);
+    extractJob.setStatus(new JobStatus())
+        .setStatistics(extractJobStats);
+
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(mockJobService)
+        .withDatasetService(mockDatasetService)
+        .readerReturns(
+            toJsonString(new TableRow().set("name", "a").set("number", "1")),
+            toJsonString(new TableRow().set("name", "b").set("number", "2")),
+            toJsonString(new TableRow().set("name", "c").set("number", "3")));
+
+    String jobIdToken = "testJobIdToken";
+    String extractDestinationDir = "mock://tempLocation";
+    TableReference destinationTable = 
BigQueryIO.parseTableSpec("project.data_set.table_name");
+    String jsonDestinationTable = toJsonString(destinationTable);
+    BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
+        jobIdToken, "query", jsonDestinationTable, true /* flattenResults */,
+        extractDestinationDir, fakeBqServices);
+
+    List<TableRow> expected = ImmutableList.of(
+        new TableRow().set("name", "a").set("number", "1"),
+        new TableRow().set("name", "b").set("number", "2"),
+        new TableRow().set("name", "c").set("number", "3"));
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setTempLocation(extractDestinationDir);
+
+    TableReference queryTable = new TableReference()
+        .setProjectId("testProejct")
+        .setDatasetId("testDataset")
+        .setTableId("testTable");
+    when(mockJobService.dryRunQuery(anyString(), anyString()))
+        .thenReturn(new JobStatistics().setQuery(
+            new JobStatistics2()
+                .setTotalBytesProcessed(100L)
+                .setReferencedTables(ImmutableList.of(queryTable))));
+    when(mockDatasetService.getTable(
+        eq(queryTable.getProjectId()), eq(queryTable.getDatasetId()), 
eq(queryTable.getTableId())))
+        .thenReturn(new Table().setSchema(new TableSchema()));
+    when(mockDatasetService.getTable(
+        eq(destinationTable.getProjectId()),
+        eq(destinationTable.getDatasetId()),
+        eq(destinationTable.getTableId())))
+        .thenReturn(new Table().setSchema(new TableSchema()));
+    IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+    when(mockIOChannelFactory.resolve(anyString(), anyString()))
+        .thenReturn("mock://tempLocation/output");
+    when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
+        .thenReturn(extractJob);
+
+    Assert.assertThat(
+        SourceTestUtils.readFromSource(bqSource, options),
+        CoreMatchers.is(expected));
+    SourceTestUtils.assertSplitAtFractionBehavior(
+        bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, 
options);
+
+    List<? extends BoundedSource<TableRow>> sources = 
bqSource.splitIntoBundles(100, options);
+    assertEquals(1, sources.size());
+    BoundedSource<TableRow> actual = sources.get(0);
+    assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
+
+    Mockito.verify(mockJobService)
+        .startQueryJob(
+            Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any());
+    Mockito.verify(mockJobService)
+        .startExtractJob(Mockito.<JobReference>any(), 
Mockito.<JobConfigurationExtract>any());
+    Mockito.verify(mockDatasetService)
+        .createDataset(anyString(), anyString(), anyString(), anyString());
+  }
+
+  @Test
+  public void testTransformingSource() throws Exception {
+    int numElements = 10000;
+    @SuppressWarnings("deprecation")
+    BoundedSource<Long> longSource = CountingSource.upTo(numElements);
+    SerializableFunction<Long, String> toStringFn =
+        new SerializableFunction<Long, String>() {
+          @Override
+          public String apply(Long input) {
+            return input.toString();
+         }};
+    BoundedSource<String> stringSource = new TransformingSource<>(
+        longSource, toStringFn, StringUtf8Coder.of());
+
+    List<String> expected = Lists.newArrayList();
+    for (int i = 0; i < numElements; i++) {
+      expected.add(String.valueOf(i));
+    }
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    Assert.assertThat(
+        SourceTestUtils.readFromSource(stringSource, options),
+        CoreMatchers.is(expected));
+    SourceTestUtils.assertSplitAtFractionBehavior(
+        stringSource, 100, 0.3, 
ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options);
+
+    SourceTestUtils.assertSourcesEqualReferenceSource(
+        stringSource, stringSource.splitIntoBundles(100, options), options);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testPassThroughThenCleanup() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    PCollection<Integer> output = p
+        .apply(Create.of(1, 2, 3))
+        .apply(new PassThroughThenCleanup<Integer>(new CleanupOperation() {
+          @Override
+          void cleanup(PipelineOptions options) throws Exception {
+            // no-op
+          }}));
+
+    PAssert.that(output).containsInAnyOrder(1, 2, 3);
+
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testPassThroughThenCleanupExecuted() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    p.apply(Create.<Integer>of())
+        .apply(new PassThroughThenCleanup<Integer>(new CleanupOperation() {
+          @Override
+          void cleanup(PipelineOptions options) throws Exception {
+            throw new RuntimeException("cleanup executed");
+          }}));
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("cleanup executed");
+
+    p.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/987350b7/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
index 238deed..3ec2b37 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
@@ -43,6 +43,7 @@ import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.collect.ImmutableList;
@@ -117,6 +118,7 @@ public class BigQueryServicesImplTest {
     BackOff backoff = new AttemptBoundedExponentialBackOff(
         5 /* attempts */, 1000 /* initialIntervalMillis */);
     JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, 
sleeper, backoff);
+
     verify(response, times(1)).getStatusCode();
     verify(response, times(1)).getContent();
     verify(response, times(1)).getContentType();
@@ -198,8 +200,10 @@ public class BigQueryServicesImplTest {
 
     BigQueryServicesImpl.JobServiceImpl jobService =
         new BigQueryServicesImpl.JobServiceImpl(bigquery);
-    Job job =
-        jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, 
BackOff.ZERO_BACKOFF);
+    JobReference jobRef = new JobReference()
+        .setProjectId("projectId")
+        .setJobId("jobId");
+    Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, 
BackOff.ZERO_BACKOFF);
 
     assertEquals(testJob, job);
     verify(response, times(1)).getStatusCode();
@@ -221,8 +225,10 @@ public class BigQueryServicesImplTest {
 
     BigQueryServicesImpl.JobServiceImpl jobService =
         new BigQueryServicesImpl.JobServiceImpl(bigquery);
-    Job job =
-        jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, 
BackOff.ZERO_BACKOFF);
+    JobReference jobRef = new JobReference()
+        .setProjectId("projectId")
+        .setJobId("jobId");
+    Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, 
BackOff.ZERO_BACKOFF);
 
     assertEquals(testJob, job);
     verify(response, times(1)).getStatusCode();
@@ -244,8 +250,10 @@ public class BigQueryServicesImplTest {
 
     BigQueryServicesImpl.JobServiceImpl jobService =
         new BigQueryServicesImpl.JobServiceImpl(bigquery);
-    Job job =
-        jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, 
BackOff.STOP_BACKOFF);
+    JobReference jobRef = new JobReference()
+        .setProjectId("projectId")
+        .setJobId("jobId");
+    Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, 
BackOff.STOP_BACKOFF);
 
     assertEquals(null, job);
     verify(response, times(1)).getStatusCode();
@@ -253,6 +261,26 @@ public class BigQueryServicesImplTest {
     verify(response, times(1)).getContentType();
   }
 
+  @Test
+  public void testExecuteWithRetries() throws IOException, 
InterruptedException {
+    Table testTable = new Table();
+
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(200);
+    when(response.getContent()).thenReturn(toStream(testTable));
+
+    Table table = BigQueryServicesImpl.executeWithRetries(
+        bigquery.tables().get("projectId", "datasetId", "tableId"),
+        "Failed to get table.",
+        Sleeper.DEFAULT,
+        BackOff.STOP_BACKOFF);
+
+    assertEquals(testTable, table);
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
   /** A helper to wrap a {@link GenericJson} object in a content stream. */
   private static InputStream toStream(GenericJson content) throws IOException {
     return new 
ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));

Reply via email to