http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
new file mode 100644
index 0000000..1d93fa3
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Implementation of DoFn to perform streaming BigQuery write.
+ */
+@SystemDoFnInternal
+@VisibleForTesting
+class StreamingWriteFn
+    extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
+  /** TableSchema in JSON. Use String to make the class Serializable. */
+  @Nullable
+  private final ValueProvider<String> jsonTableSchema;
+
+  @Nullable private final String tableDescription;
+
+  private final BigQueryServices bqServices;
+
+  /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
+  private transient Map<String, List<TableRow>> tableRows;
+
+  private final Write.CreateDisposition createDisposition;
+
+  /** The list of unique ids for each BigQuery table row. */
+  private transient Map<String, List<String>> uniqueIdsForTableRows;
+
+  /** The list of tables created so far, so we don't try the creation
+      each time. */
+  private static Set<String> createdTables =
+      Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+  /** Tracks bytes written, exposed as "ByteCount" Counter. */
+  private Counter byteCounter = Metrics.counter(StreamingWriteFn.class, 
"ByteCount");
+
+  /** Constructor. */
+  StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema,
+                   Write.CreateDisposition createDisposition,
+                   @Nullable String tableDescription, BigQueryServices 
bqServices) {
+    this.jsonTableSchema = schema == null ? null :
+        NestedValueProvider.of(schema, new TableSchemaToJsonSchema());
+    this.createDisposition = createDisposition;
+    this.bqServices = checkNotNull(bqServices, "bqServices");
+    this.tableDescription = tableDescription;
+  }
+
+  /**
+   * Clear the cached map of created tables. Used for testing.
+   */
+  static void clearCreatedTables() {
+    synchronized (createdTables) {
+      createdTables.clear();
+    }
+  }
+
+  /** Prepares a target BigQuery table. */
+  @StartBundle
+  public void startBundle(Context context) {
+    tableRows = new HashMap<>();
+    uniqueIdsForTableRows = new HashMap<>();
+  }
+
+  /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */
+  @ProcessElement
+  public void processElement(ProcessContext context) {
+    String tableSpec = context.element().getKey().getKey();
+    List<TableRow> rows = BigQueryHelpers.getOrCreateMapListValue(tableRows, 
tableSpec);
+    List<String> uniqueIds = 
BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows,
+        tableSpec);
+
+    rows.add(context.element().getValue().tableRow);
+    uniqueIds.add(context.element().getValue().uniqueId);
+  }
+
+  /** Writes the accumulated rows into BigQuery with streaming API. */
+  @FinishBundle
+  public void finishBundle(Context context) throws Exception {
+    BigQueryOptions options = 
context.getPipelineOptions().as(BigQueryOptions.class);
+
+    for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) {
+      TableReference tableReference = getOrCreateTable(options, 
entry.getKey());
+      flushRows(tableReference, entry.getValue(),
+          uniqueIdsForTableRows.get(entry.getKey()), options);
+    }
+    tableRows.clear();
+    uniqueIdsForTableRows.clear();
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+
+    builder
+        .addIfNotNull(DisplayData.item("schema", jsonTableSchema)
+          .withLabel("Table Schema"))
+        .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
+          .withLabel("Table Description"));
+  }
+
+  public TableReference getOrCreateTable(BigQueryOptions options, String 
tableSpec)
+      throws InterruptedException, IOException {
+    TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec);
+    if (createDisposition != createDisposition.CREATE_NEVER
+        && !createdTables.contains(tableSpec)) {
+      synchronized (createdTables) {
+        // Another thread may have succeeded in creating the table in the 
meanwhile, so
+        // check again. This check isn't needed for correctness, but we add it 
to prevent
+        // every thread from attempting a create and overwhelming our BigQuery 
quota.
+        DatasetService datasetService = bqServices.getDatasetService(options);
+        if (!createdTables.contains(tableSpec)) {
+          if (datasetService.getTable(tableReference) == null) {
+            TableSchema tableSchema = BigQueryIO.JSON_FACTORY.fromString(
+                jsonTableSchema.get(), TableSchema.class);
+            datasetService.createTable(
+                new Table()
+                    .setTableReference(tableReference)
+                    .setSchema(tableSchema)
+                    .setDescription(tableDescription));
+          }
+          createdTables.add(tableSpec);
+        }
+      }
+    }
+    return tableReference;
+  }
+
+  /**
+   * Writes the accumulated rows into BigQuery with streaming API.
+   */
+  private void flushRows(TableReference tableReference,
+      List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions 
options)
+          throws InterruptedException {
+    if (!tableRows.isEmpty()) {
+      try {
+        long totalBytes = bqServices.getDatasetService(options).insertAll(
+            tableReference, tableRows, uniqueIds);
+        byteCounter.inc(totalBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfo.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfo.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfo.java
new file mode 100644
index 0000000..4a59ab6
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfo.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+
+/**
+ * Encapsulates a {@link TableRow} along with a unique insertion id.
+ */
+class TableRowInfo {
+  TableRowInfo(TableRow tableRow, String uniqueId) {
+    this.tableRow = tableRow;
+    this.uniqueId = uniqueId;
+  }
+
+  final TableRow tableRow;
+  final String uniqueId;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
new file mode 100644
index 0000000..5e8fa29
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
+
+/**
+ * Defines a coder for {@link TableRowInfo} objects.
+ */
+@VisibleForTesting
+class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
+  private static final TableRowInfoCoder INSTANCE = new TableRowInfoCoder();
+
+  @JsonCreator
+  public static TableRowInfoCoder of() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void encode(TableRowInfo value, OutputStream outStream, Context 
context)
+      throws IOException {
+    if (value == null) {
+      throw new CoderException("cannot encode a null value");
+    }
+    tableRowCoder.encode(value.tableRow, outStream, context.nested());
+    idCoder.encode(value.uniqueId, outStream, context);
+  }
+
+  @Override
+  public TableRowInfo decode(InputStream inStream, Context context)
+      throws IOException {
+    return new TableRowInfo(
+        tableRowCoder.decode(inStream, context.nested()),
+        idCoder.decode(inStream, context));
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    throw new NonDeterministicException(this, "TableRows are not 
deterministic.");
+  }
+
+  TableRowJsonCoder tableRowCoder = TableRowJsonCoder.of();
+  StringUtf8Coder idCoder = StringUtf8Coder.of();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
new file mode 100644
index 0000000..014c498
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.io.CountingOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Writes {@TableRow} objects out to a file. Used when doing batch load jobs 
into BigQuery.
+ */
+class TableRowWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);
+
+  private static final Coder<TableRow> CODER = TableRowJsonCoder.of();
+  private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
+  private final String tempFilePrefix;
+  private String id;
+  private String fileName;
+  private WritableByteChannel channel;
+  protected String mimeType = MimeTypes.TEXT;
+  private CountingOutputStream out;
+
+  TableRowWriter(String basename) {
+    this.tempFilePrefix = basename;
+  }
+
+  public final void open(String uId) throws Exception {
+    id = uId;
+    fileName = tempFilePrefix + id;
+    LOG.debug("Opening {}.", fileName);
+    channel = IOChannelUtils.create(fileName, mimeType);
+    try {
+      out = new CountingOutputStream(Channels.newOutputStream(channel));
+      LOG.debug("Writing header to {}.", fileName);
+    } catch (Exception e) {
+      try {
+        LOG.error("Writing header to {} failed, closing channel.", fileName);
+        channel.close();
+      } catch (IOException closeException) {
+        LOG.error("Closing channel for {} failed", fileName);
+      }
+      throw e;
+    }
+    LOG.debug("Starting write of bundle {} to {}.", this.id, fileName);
+  }
+
+  public void write(TableRow value) throws Exception {
+    CODER.encode(value, out, Context.OUTER);
+    out.write(NEWLINE);
+  }
+
+  public final KV<String, Long> close() throws IOException {
+    channel.close();
+    return KV.of(fileName, out.getCount());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
new file mode 100644
index 0000000..a6608e4
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+
+/**
+ * Fn that tags each table row with a unique id and destination table.
+ * To avoid calling UUID.randomUUID() for each element, which can be costly,
+ * a randomUUID is generated only once per bucket of data. The actual unique
+ * id is created by concatenating this randomUUID with a sequential number.
+ */
+@VisibleForTesting
+class TagWithUniqueIdsAndTable<T>
+    extends DoFn<T, KV<ShardedKey<String>, TableRowInfo>> {
+  /** TableSpec to write to. */
+  private final ValueProvider<String> tableSpec;
+
+  /** User function mapping windowed values to {@link TableReference} in JSON. 
*/
+  private final SerializableFunction<ValueInSingleWindow<T>, TableReference> 
tableRefFunction;
+
+  /** User function mapping user type to a TableRow. */
+  private final SerializableFunction<T, TableRow> formatFunction;
+
+  private transient String randomUUID;
+  private transient long sequenceNo = 0L;
+
+  TagWithUniqueIdsAndTable(BigQueryOptions options,
+                           ValueProvider<TableReference> table,
+                           SerializableFunction<ValueInSingleWindow<T>, 
TableReference>
+                               tableRefFunction,
+                           SerializableFunction<T, TableRow> formatFunction) {
+    checkArgument(table == null ^ tableRefFunction == null,
+        "Exactly one of table or tableRefFunction should be set");
+    if (table != null) {
+      if (table.isAccessible() && 
Strings.isNullOrEmpty(table.get().getProjectId())) {
+        TableReference tableRef = table.get()
+            .setProjectId(options.as(BigQueryOptions.class).getProject());
+        table = NestedValueProvider.of(
+            StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)),
+            new JsonTableRefToTableRef());
+      }
+      this.tableSpec = NestedValueProvider.of(table, new 
TableRefToTableSpec());
+    } else {
+      tableSpec = null;
+    }
+    this.tableRefFunction = tableRefFunction;
+    this.formatFunction = formatFunction;
+  }
+
+
+  @StartBundle
+  public void startBundle(Context context) {
+    randomUUID = UUID.randomUUID().toString();
+  }
+
+  /** Tag the input with a unique id. */
+  @ProcessElement
+  public void processElement(ProcessContext context, BoundedWindow window) 
throws IOException {
+    String uniqueId = randomUUID + sequenceNo++;
+    ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
+    String tableSpec = tableSpecFromWindowedValue(
+        context.getPipelineOptions().as(BigQueryOptions.class),
+        ValueInSingleWindow.of(context.element(), context.timestamp(), window, 
context.pane()));
+    // We output on keys 0-50 to ensure that there's enough batching for
+    // BigQuery.
+    context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 
50)),
+        new TableRowInfo(formatFunction.apply(context.element()), uniqueId)));
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+
+    builder.addIfNotNull(DisplayData.item("table", tableSpec));
+    if (tableRefFunction != null) {
+      builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
+        .withLabel("Table Reference Function"));
+    }
+  }
+
+  @VisibleForTesting
+  ValueProvider<String> getTableSpec() {
+    return tableSpec;
+  }
+
+  private String tableSpecFromWindowedValue(BigQueryOptions options,
+                                            ValueInSingleWindow<T> value) {
+    if (tableSpec != null) {
+      return tableSpec.get();
+    } else {
+      TableReference table = tableRefFunction.apply(value);
+      if (table.getProjectId() == null) {
+        table.setProjectId(options.getProject());
+      }
+      return BigQueryHelpers.toTableSpec(table);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
index a86adfb..f7d8252 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkNotNull;

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java
new file mode 100644
index 0000000..6219226
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.UUID;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Writes each bundle of {@link TableRow} elements out to a separate file using
+ * {@link TableRowWriter}.
+ */
+class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(WriteBundles.class);
+
+  private transient TableRowWriter writer = null;
+  private final String tempFilePrefix;
+
+  WriteBundles(String tempFilePrefix) {
+    this.tempFilePrefix = tempFilePrefix;
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) throws Exception {
+    if (writer == null) {
+      writer = new TableRowWriter(tempFilePrefix);
+      writer.open(UUID.randomUUID().toString());
+      LOG.debug("Done opening writer {}", writer);
+    }
+    try {
+      writer.write(c.element());
+    } catch (Exception e) {
+      // Discard write result and close the write.
+      try {
+        writer.close();
+        // The writer does not need to be reset, as this DoFn cannot be reused.
+      } catch (Exception closeException) {
+        // Do not mask the exception that caused the write to fail.
+        e.addSuppressed(closeException);
+      }
+      throw e;
+    }
+  }
+
+  @FinishBundle
+  public void finishBundle(Context c) throws Exception {
+    if (writer != null) {
+      c.output(writer.close());
+      writer = null;
+    }
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+
+    builder
+        .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
+            .withLabel("Temporary File Prefix"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
new file mode 100644
index 0000000..f4bf198
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.UUID;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Partitions temporary files based on number of files and file sizes.
+ */
+class WritePartition extends DoFn<String, KV<Long, List<String>>> {
+  private final PCollectionView<Iterable<KV<String, Long>>> resultsView;
+  private TupleTag<KV<Long, List<String>>> multiPartitionsTag;
+  private TupleTag<KV<Long, List<String>>> singlePartitionTag;
+
+  public WritePartition(
+      PCollectionView<Iterable<KV<String, Long>>> resultsView,
+      TupleTag<KV<Long, List<String>>> multiPartitionsTag,
+      TupleTag<KV<Long, List<String>>> singlePartitionTag) {
+    this.resultsView = resultsView;
+    this.multiPartitionsTag = multiPartitionsTag;
+    this.singlePartitionTag = singlePartitionTag;
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) throws Exception {
+    List<KV<String, Long>> results = 
Lists.newArrayList(c.sideInput(resultsView));
+    if (results.isEmpty()) {
+      TableRowWriter writer = new TableRowWriter(c.element());
+      writer.open(UUID.randomUUID().toString());
+      results.add(writer.close());
+    }
+
+    long partitionId = 0;
+    int currNumFiles = 0;
+    long currSizeBytes = 0;
+    List<String> currResults = Lists.newArrayList();
+    for (int i = 0; i < results.size(); ++i) {
+      KV<String, Long> fileResult = results.get(i);
+      if (currNumFiles + 1 > Write.MAX_NUM_FILES
+          || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) {
+        c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
+        currResults = Lists.newArrayList();
+        currNumFiles = 0;
+        currSizeBytes = 0;
+      }
+      ++currNumFiles;
+      currSizeBytes += fileResult.getValue();
+      currResults.add(fileResult.getKey());
+    }
+    if (partitionId == 0) {
+      c.sideOutput(singlePartitionTag, KV.of(++partitionId, currResults));
+    } else {
+      c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
new file mode 100644
index 0000000..8cb9439
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Copies temporary tables to destination table.
+ */
+class WriteRename extends DoFn<String, Void> {
+  private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class);
+
+  private final BigQueryServices bqServices;
+  private final PCollectionView<String> jobIdToken;
+  private final ValueProvider<String> jsonTableRef;
+  private final WriteDisposition writeDisposition;
+  private final CreateDisposition createDisposition;
+  private final PCollectionView<Iterable<String>> tempTablesView;
+  @Nullable
+  private final String tableDescription;
+
+  public WriteRename(
+      BigQueryServices bqServices,
+      PCollectionView<String> jobIdToken,
+      ValueProvider<String> jsonTableRef,
+      WriteDisposition writeDisposition,
+      CreateDisposition createDisposition,
+      PCollectionView<Iterable<String>> tempTablesView,
+      @Nullable String tableDescription) {
+    this.bqServices = bqServices;
+    this.jobIdToken = jobIdToken;
+    this.jsonTableRef = jsonTableRef;
+    this.writeDisposition = writeDisposition;
+    this.createDisposition = createDisposition;
+    this.tempTablesView = tempTablesView;
+    this.tableDescription = tableDescription;
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) throws Exception {
+    List<String> tempTablesJson = 
Lists.newArrayList(c.sideInput(tempTablesView));
+
+    // Do not copy if no temp tables are provided
+    if (tempTablesJson.size() == 0) {
+      return;
+    }
+
+    List<TableReference> tempTables = Lists.newArrayList();
+    for (String table : tempTablesJson) {
+      tempTables.add(BigQueryHelpers.fromJsonString(table, 
TableReference.class));
+    }
+    copy(
+        
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+        
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
+        c.sideInput(jobIdToken),
+        BigQueryHelpers.fromJsonString(jsonTableRef.get(), 
TableReference.class),
+        tempTables,
+        writeDisposition,
+        createDisposition,
+        tableDescription);
+
+    DatasetService tableService =
+        
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
+    removeTemporaryTables(tableService, tempTables);
+  }
+
+  private void copy(
+      JobService jobService,
+      DatasetService datasetService,
+      String jobIdPrefix,
+      TableReference ref,
+      List<TableReference> tempTables,
+      WriteDisposition writeDisposition,
+      CreateDisposition createDisposition,
+      @Nullable String tableDescription) throws InterruptedException, 
IOException {
+    JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy()
+        .setSourceTables(tempTables)
+        .setDestinationTable(ref)
+        .setWriteDisposition(writeDisposition.name())
+        .setCreateDisposition(createDisposition.name());
+
+    String projectId = ref.getProjectId();
+    Job lastFailedCopyJob = null;
+    for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
+      String jobId = jobIdPrefix + "-" + i;
+      JobReference jobRef = new JobReference()
+          .setProjectId(projectId)
+          .setJobId(jobId);
+      jobService.startCopyJob(jobRef, copyConfig);
+      Job copyJob = jobService.pollJob(jobRef, 
Write.LOAD_JOB_POLL_MAX_RETRIES);
+      Status jobStatus = BigQueryHelpers.parseStatus(copyJob);
+      switch (jobStatus) {
+        case SUCCEEDED:
+          if (tableDescription != null) {
+            datasetService.patchTableDescription(ref, tableDescription);
+          }
+          return;
+        case UNKNOWN:
+          throw new RuntimeException(String.format(
+              "UNKNOWN status of copy job [%s]: %s.", jobId,
+              BigQueryHelpers.jobToPrettyString(copyJob)));
+        case FAILED:
+          lastFailedCopyJob = copyJob;
+          continue;
+        default:
+          throw new IllegalStateException(String.format(
+              "Unexpected status [%s] of load job: %s.",
+              jobStatus, BigQueryHelpers.jobToPrettyString(copyJob)));
+      }
+    }
+    throw new RuntimeException(String.format(
+        "Failed to create copy job with id prefix %s, "
+            + "reached max retries: %d, last failed copy job: %s.",
+        jobIdPrefix,
+        Write.MAX_RETRY_JOBS,
+        BigQueryHelpers.jobToPrettyString(lastFailedCopyJob)));
+  }
+
+  static void removeTemporaryTables(DatasetService tableService,
+      List<TableReference> tempTables) {
+    for (TableReference tableRef : tempTables) {
+      try {
+        LOG.debug("Deleting table {}", BigQueryHelpers.toJsonString(tableRef));
+        tableService.deleteTable(tableRef);
+      } catch (Exception e) {
+        LOG.warn("Failed to delete the table {}", 
BigQueryHelpers.toJsonString(tableRef), e);
+      }
+    }
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+
+    builder
+        .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
+            .withLabel("Table Reference"))
+        .add(DisplayData.item("writeDisposition", writeDisposition.toString())
+            .withLabel("Write Disposition"))
+        .add(DisplayData.item("createDisposition", 
createDisposition.toString())
+            .withLabel("Create Disposition"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
new file mode 100644
index 0000000..29680ad
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.List;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.FileIOChannelFactory;
+import org.apache.beam.sdk.util.GcsIOChannelFactory;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Writes partitions to BigQuery tables.
+ */
+class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
+  private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class);
+
+  private final boolean singlePartition;
+  private final BigQueryServices bqServices;
+  private final PCollectionView<String> jobIdToken;
+  private final String tempFilePrefix;
+  private final ValueProvider<String> jsonTableRef;
+  private final ValueProvider<String> jsonSchema;
+  private final WriteDisposition writeDisposition;
+  private final CreateDisposition createDisposition;
+  @Nullable
+  private final String tableDescription;
+
+  public WriteTables(
+      boolean singlePartition,
+      BigQueryServices bqServices,
+      PCollectionView<String> jobIdToken,
+      String tempFilePrefix,
+      ValueProvider<String> jsonTableRef,
+      ValueProvider<String> jsonSchema,
+      WriteDisposition writeDisposition,
+      CreateDisposition createDisposition,
+      @Nullable String tableDescription) {
+    this.singlePartition = singlePartition;
+    this.bqServices = bqServices;
+    this.jobIdToken = jobIdToken;
+    this.tempFilePrefix = tempFilePrefix;
+    this.jsonTableRef = jsonTableRef;
+    this.jsonSchema = jsonSchema;
+    this.writeDisposition = writeDisposition;
+    this.createDisposition = createDisposition;
+    this.tableDescription = tableDescription;
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) throws Exception {
+    List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
+    String jobIdPrefix = String.format(
+        c.sideInput(jobIdToken) + "_%05d", c.element().getKey());
+    TableReference ref = BigQueryHelpers.fromJsonString(jsonTableRef.get(),
+        TableReference.class);
+    if (!singlePartition) {
+      ref.setTableId(jobIdPrefix);
+    }
+
+    load(
+        
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+        
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
+        jobIdPrefix,
+        ref,
+        BigQueryHelpers.fromJsonString(
+            jsonSchema == null ? null : jsonSchema.get(), TableSchema.class),
+        partition,
+        writeDisposition,
+        createDisposition,
+        tableDescription);
+    c.output(BigQueryHelpers.toJsonString(ref));
+
+    removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition);
+  }
+
+  private void load(
+      JobService jobService,
+      DatasetService datasetService,
+      String jobIdPrefix,
+      TableReference ref,
+      @Nullable TableSchema schema,
+      List<String> gcsUris,
+      WriteDisposition writeDisposition,
+      CreateDisposition createDisposition,
+      @Nullable String tableDescription) throws InterruptedException, 
IOException {
+    JobConfigurationLoad loadConfig = new JobConfigurationLoad()
+        .setDestinationTable(ref)
+        .setSchema(schema)
+        .setSourceUris(gcsUris)
+        .setWriteDisposition(writeDisposition.name())
+        .setCreateDisposition(createDisposition.name())
+        .setSourceFormat("NEWLINE_DELIMITED_JSON");
+
+    String projectId = ref.getProjectId();
+    Job lastFailedLoadJob = null;
+    for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
+      String jobId = jobIdPrefix + "-" + i;
+      JobReference jobRef = new JobReference()
+          .setProjectId(projectId)
+          .setJobId(jobId);
+      jobService.startLoadJob(jobRef, loadConfig);
+      Job loadJob = jobService.pollJob(jobRef, 
Write.LOAD_JOB_POLL_MAX_RETRIES);
+      Status jobStatus = BigQueryHelpers.parseStatus(loadJob);
+      switch (jobStatus) {
+        case SUCCEEDED:
+          if (tableDescription != null) {
+            datasetService.patchTableDescription(ref, tableDescription);
+          }
+          return;
+        case UNKNOWN:
+          throw new RuntimeException(String.format(
+              "UNKNOWN status of load job [%s]: %s.", jobId,
+              BigQueryHelpers.jobToPrettyString(loadJob)));
+        case FAILED:
+          lastFailedLoadJob = loadJob;
+          continue;
+        default:
+          throw new IllegalStateException(String.format(
+              "Unexpected status [%s] of load job: %s.",
+              jobStatus, BigQueryHelpers.jobToPrettyString(loadJob)));
+      }
+    }
+    throw new RuntimeException(String.format(
+        "Failed to create load job with id prefix %s, "
+            + "reached max retries: %d, last failed load job: %s.",
+        jobIdPrefix,
+        Write.MAX_RETRY_JOBS,
+        BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
+  }
+
+  static void removeTemporaryFiles(
+      PipelineOptions options,
+      String tempFilePrefix,
+      Collection<String> files)
+      throws IOException {
+    IOChannelFactory factory = IOChannelUtils.getFactory(tempFilePrefix);
+    if (factory instanceof GcsIOChannelFactory) {
+      GcsUtil gcsUtil = new GcsUtilFactory().create(options);
+      gcsUtil.remove(files);
+    } else if (factory instanceof FileIOChannelFactory) {
+      for (String filename : files) {
+        LOG.debug("Removing file {}", filename);
+        boolean exists = Files.deleteIfExists(Paths.get(filename));
+        if (!exists) {
+          LOG.debug("{} does not exist.", filename);
+        }
+      }
+    } else {
+      throw new IOException("Unrecognized file system.");
+    }
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+
+    builder
+        .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
+            .withLabel("Temporary File Prefix"))
+        .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
+            .withLabel("Table Reference"))
+        .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema)
+            .withLabel("Table Schema"))
+        .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
+            .withLabel("Table Description"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index eca95b9..e7db8a2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -102,14 +102,10 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.CountingSource;
 import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.TableRowWriter;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WritePartition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteRename;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteTables;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import 
org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;
@@ -654,7 +650,9 @@ public class BigQueryIOTest implements Serializable {
 
   @Rule public final transient TestPipeline p = TestPipeline.create();
   @Rule public transient ExpectedException thrown = ExpectedException.none();
-  @Rule public transient ExpectedLogs logged = 
ExpectedLogs.none(BigQueryIO.class);
+  @Rule public transient ExpectedLogs loggedBigQueryIO = 
ExpectedLogs.none(BigQueryIO.class);
+  @Rule public transient ExpectedLogs loggedWriteRename = 
ExpectedLogs.none(WriteRename.class);
+  @Rule public transient ExpectedLogs loggedWriteTables = 
ExpectedLogs.none(WriteTables.class);
   @Rule public transient TemporaryFolder testFolder = new TemporaryFolder();
   @Mock(extraInterfaces = Serializable.class)
   public transient BigQueryServices.JobService mockJobService;
@@ -1577,7 +1575,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testStreamingWriteFnCreateNever() throws Exception {
-    BigQueryIO.StreamingWriteFn fn = new BigQueryIO.StreamingWriteFn(
+    StreamingWriteFn fn = new StreamingWriteFn(
         null, CreateDisposition.CREATE_NEVER, null, new 
FakeBigQueryServices());
     assertEquals(BigQueryHelpers.parseTableSpec("dataset.table"),
         fn.getOrCreateTable(null, "dataset.table"));
@@ -2230,9 +2228,9 @@ public class BigQueryIOTest implements Serializable {
     testNumFiles(tempDir, 0);
 
     for (String fileName : fileNames) {
-      logged.verifyDebug("Removing file " + fileName);
+      loggedWriteTables.verifyDebug("Removing file " + fileName);
     }
-    logged.verifyDebug(fileNames.get(numFiles) + " does not exist.");
+    loggedWriteTables.verifyDebug(fileNames.get(numFiles) + " does not 
exist.");
   }
 
   @Test
@@ -2297,11 +2295,14 @@ public class BigQueryIOTest implements Serializable {
     WriteRename.removeTemporaryTables(mockDatasetService, tableRefs);
 
     for (TableReference ref : tableRefs) {
-      logged.verifyDebug("Deleting table " + toJsonString(ref));
+      loggedWriteRename.verifyDebug("Deleting table " + toJsonString(ref));
     }
-    logged.verifyWarn("Failed to delete the table " + 
toJsonString(tableRefs.get(0)));
-    logged.verifyNotLogged("Failed to delete the table " + 
toJsonString(tableRefs.get(1)));
-    logged.verifyNotLogged("Failed to delete the table " + 
toJsonString(tableRefs.get(2)));
+    loggedWriteRename.verifyWarn("Failed to delete the table "
+        + toJsonString(tableRefs.get(0)));
+    loggedWriteRename.verifyNotLogged("Failed to delete the table "
+        + toJsonString(tableRefs.get(1)));
+    loggedWriteRename.verifyNotLogged("Failed to delete the table "
+        + toJsonString(tableRefs.get(2)));
   }
 
   /** Test options. **/
@@ -2367,8 +2368,8 @@ public class BigQueryIOTest implements Serializable {
   public void testTagWithUniqueIdsAndTableProjectNotNullWithNvp() {
     BigQueryOptions bqOptions = 
PipelineOptionsFactory.as(BigQueryOptions.class);
     bqOptions.setProject("project");
-    BigQueryIO.TagWithUniqueIdsAndTable<TableRow> tag =
-        new BigQueryIO.TagWithUniqueIdsAndTable<TableRow>(
+    TagWithUniqueIdsAndTable<TableRow> tag =
+        new TagWithUniqueIdsAndTable<TableRow>(
             bqOptions, NestedValueProvider.of(
                 StaticValueProvider.of("data_set.table_name"),
                 new TableSpecToTableRef()), null, null);
@@ -2423,12 +2424,12 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testShardedKeyCoderIsSerializableWithWellKnownCoderType() {
-    
CoderProperties.coderSerializable(BigQueryIO.ShardedKeyCoder.of(GlobalWindow.Coder.INSTANCE));
+    
CoderProperties.coderSerializable(ShardedKeyCoder.of(GlobalWindow.Coder.INSTANCE));
   }
 
   @Test
   public void testTableRowInfoCoderSerializable() {
-    CoderProperties.coderSerializable(BigQueryIO.TableRowInfoCoder.of());
+    CoderProperties.coderSerializable(TableRowInfoCoder.of());
   }
 
   @Test
@@ -2436,8 +2437,8 @@ public class BigQueryIOTest implements Serializable {
     CoderProperties.coderSerializable(
         WindowedValue.getFullCoder(
             KvCoder.of(
-                BigQueryIO.ShardedKeyCoder.of(StringUtf8Coder.of()),
-                BigQueryIO.TableRowInfoCoder.of()),
+                ShardedKeyCoder.of(StringUtf8Coder.of()),
+                TableRowInfoCoder.of()),
             IntervalWindow.getCoder()));
   }
 }

Reply via email to