http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java new file mode 100644 index 0000000..1d93fa3 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import static com.google.common.base.Preconditions.checkNotNull; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.values.KV; + +/** + * Implementation of DoFn to perform streaming BigQuery write. + */ +@SystemDoFnInternal +@VisibleForTesting +class StreamingWriteFn + extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> { + /** TableSchema in JSON. Use String to make the class Serializable. */ + @Nullable + private final ValueProvider<String> jsonTableSchema; + + @Nullable private final String tableDescription; + + private final BigQueryServices bqServices; + + /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */ + private transient Map<String, List<TableRow>> tableRows; + + private final Write.CreateDisposition createDisposition; + + /** The list of unique ids for each BigQuery table row. */ + private transient Map<String, List<String>> uniqueIdsForTableRows; + + /** The list of tables created so far, so we don't try the creation + each time. */ + private static Set<String> createdTables = + Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); + + /** Tracks bytes written, exposed as "ByteCount" Counter. */ + private Counter byteCounter = Metrics.counter(StreamingWriteFn.class, "ByteCount"); + + /** Constructor. */ + StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema, + Write.CreateDisposition createDisposition, + @Nullable String tableDescription, BigQueryServices bqServices) { + this.jsonTableSchema = schema == null ? null : + NestedValueProvider.of(schema, new TableSchemaToJsonSchema()); + this.createDisposition = createDisposition; + this.bqServices = checkNotNull(bqServices, "bqServices"); + this.tableDescription = tableDescription; + } + + /** + * Clear the cached map of created tables. Used for testing. + */ + static void clearCreatedTables() { + synchronized (createdTables) { + createdTables.clear(); + } + } + + /** Prepares a target BigQuery table. */ + @StartBundle + public void startBundle(Context context) { + tableRows = new HashMap<>(); + uniqueIdsForTableRows = new HashMap<>(); + } + + /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */ + @ProcessElement + public void processElement(ProcessContext context) { + String tableSpec = context.element().getKey().getKey(); + List<TableRow> rows = BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec); + List<String> uniqueIds = BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows, + tableSpec); + + rows.add(context.element().getValue().tableRow); + uniqueIds.add(context.element().getValue().uniqueId); + } + + /** Writes the accumulated rows into BigQuery with streaming API. */ + @FinishBundle + public void finishBundle(Context context) throws Exception { + BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); + + for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) { + TableReference tableReference = getOrCreateTable(options, entry.getKey()); + flushRows(tableReference, entry.getValue(), + uniqueIdsForTableRows.get(entry.getKey()), options); + } + tableRows.clear(); + uniqueIdsForTableRows.clear(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder + .addIfNotNull(DisplayData.item("schema", jsonTableSchema) + .withLabel("Table Schema")) + .addIfNotNull(DisplayData.item("tableDescription", tableDescription) + .withLabel("Table Description")); + } + + public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec) + throws InterruptedException, IOException { + TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec); + if (createDisposition != createDisposition.CREATE_NEVER + && !createdTables.contains(tableSpec)) { + synchronized (createdTables) { + // Another thread may have succeeded in creating the table in the meanwhile, so + // check again. This check isn't needed for correctness, but we add it to prevent + // every thread from attempting a create and overwhelming our BigQuery quota. + DatasetService datasetService = bqServices.getDatasetService(options); + if (!createdTables.contains(tableSpec)) { + if (datasetService.getTable(tableReference) == null) { + TableSchema tableSchema = BigQueryIO.JSON_FACTORY.fromString( + jsonTableSchema.get(), TableSchema.class); + datasetService.createTable( + new Table() + .setTableReference(tableReference) + .setSchema(tableSchema) + .setDescription(tableDescription)); + } + createdTables.add(tableSpec); + } + } + } + return tableReference; + } + + /** + * Writes the accumulated rows into BigQuery with streaming API. + */ + private void flushRows(TableReference tableReference, + List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options) + throws InterruptedException { + if (!tableRows.isEmpty()) { + try { + long totalBytes = bqServices.getDatasetService(options).insertAll( + tableReference, tableRows, uniqueIds); + byteCounter.inc(totalBytes); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfo.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfo.java new file mode 100644 index 0000000..4a59ab6 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfo.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; + +/** + * Encapsulates a {@link TableRow} along with a unique insertion id. + */ +class TableRowInfo { + TableRowInfo(TableRow tableRow, String uniqueId) { + this.tableRow = tableRow; + this.uniqueId = uniqueId; + } + + final TableRow tableRow; + final String uniqueId; +} http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java new file mode 100644 index 0000000..5e8fa29 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.TableRowJsonCoder; + +/** + * Defines a coder for {@link TableRowInfo} objects. + */ +@VisibleForTesting +class TableRowInfoCoder extends AtomicCoder<TableRowInfo> { + private static final TableRowInfoCoder INSTANCE = new TableRowInfoCoder(); + + @JsonCreator + public static TableRowInfoCoder of() { + return INSTANCE; + } + + @Override + public void encode(TableRowInfo value, OutputStream outStream, Context context) + throws IOException { + if (value == null) { + throw new CoderException("cannot encode a null value"); + } + tableRowCoder.encode(value.tableRow, outStream, context.nested()); + idCoder.encode(value.uniqueId, outStream, context); + } + + @Override + public TableRowInfo decode(InputStream inStream, Context context) + throws IOException { + return new TableRowInfo( + tableRowCoder.decode(inStream, context.nested()), + idCoder.decode(inStream, context)); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException(this, "TableRows are not deterministic."); + } + + TableRowJsonCoder tableRowCoder = TableRowJsonCoder.of(); + StringUtf8Coder idCoder = StringUtf8Coder.of(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java new file mode 100644 index 0000000..014c498 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.io.CountingOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.coders.TableRowJsonCoder; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.MimeTypes; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Writes {@TableRow} objects out to a file. Used when doing batch load jobs into BigQuery. + */ +class TableRowWriter { + private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class); + + private static final Coder<TableRow> CODER = TableRowJsonCoder.of(); + private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + private final String tempFilePrefix; + private String id; + private String fileName; + private WritableByteChannel channel; + protected String mimeType = MimeTypes.TEXT; + private CountingOutputStream out; + + TableRowWriter(String basename) { + this.tempFilePrefix = basename; + } + + public final void open(String uId) throws Exception { + id = uId; + fileName = tempFilePrefix + id; + LOG.debug("Opening {}.", fileName); + channel = IOChannelUtils.create(fileName, mimeType); + try { + out = new CountingOutputStream(Channels.newOutputStream(channel)); + LOG.debug("Writing header to {}.", fileName); + } catch (Exception e) { + try { + LOG.error("Writing header to {} failed, closing channel.", fileName); + channel.close(); + } catch (IOException closeException) { + LOG.error("Closing channel for {} failed", fileName); + } + throw e; + } + LOG.debug("Starting write of bundle {} to {}.", this.id, fileName); + } + + public void write(TableRow value) throws Exception { + CODER.encode(value, out, Context.OUTER); + out.write(NEWLINE); + } + + public final KV<String, Long> close() throws IOException { + channel.close(); + return KV.of(fileName, out.getCount()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java new file mode 100644 index 0000000..a6608e4 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.ValueInSingleWindow; + +/** + * Fn that tags each table row with a unique id and destination table. + * To avoid calling UUID.randomUUID() for each element, which can be costly, + * a randomUUID is generated only once per bucket of data. The actual unique + * id is created by concatenating this randomUUID with a sequential number. + */ +@VisibleForTesting +class TagWithUniqueIdsAndTable<T> + extends DoFn<T, KV<ShardedKey<String>, TableRowInfo>> { + /** TableSpec to write to. */ + private final ValueProvider<String> tableSpec; + + /** User function mapping windowed values to {@link TableReference} in JSON. */ + private final SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction; + + /** User function mapping user type to a TableRow. */ + private final SerializableFunction<T, TableRow> formatFunction; + + private transient String randomUUID; + private transient long sequenceNo = 0L; + + TagWithUniqueIdsAndTable(BigQueryOptions options, + ValueProvider<TableReference> table, + SerializableFunction<ValueInSingleWindow<T>, TableReference> + tableRefFunction, + SerializableFunction<T, TableRow> formatFunction) { + checkArgument(table == null ^ tableRefFunction == null, + "Exactly one of table or tableRefFunction should be set"); + if (table != null) { + if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) { + TableReference tableRef = table.get() + .setProjectId(options.as(BigQueryOptions.class).getProject()); + table = NestedValueProvider.of( + StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)), + new JsonTableRefToTableRef()); + } + this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec()); + } else { + tableSpec = null; + } + this.tableRefFunction = tableRefFunction; + this.formatFunction = formatFunction; + } + + + @StartBundle + public void startBundle(Context context) { + randomUUID = UUID.randomUUID().toString(); + } + + /** Tag the input with a unique id. */ + @ProcessElement + public void processElement(ProcessContext context, BoundedWindow window) throws IOException { + String uniqueId = randomUUID + sequenceNo++; + ThreadLocalRandom randomGenerator = ThreadLocalRandom.current(); + String tableSpec = tableSpecFromWindowedValue( + context.getPipelineOptions().as(BigQueryOptions.class), + ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane())); + // We output on keys 0-50 to ensure that there's enough batching for + // BigQuery. + context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)), + new TableRowInfo(formatFunction.apply(context.element()), uniqueId))); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder.addIfNotNull(DisplayData.item("table", tableSpec)); + if (tableRefFunction != null) { + builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()) + .withLabel("Table Reference Function")); + } + } + + @VisibleForTesting + ValueProvider<String> getTableSpec() { + return tableSpec; + } + + private String tableSpecFromWindowedValue(BigQueryOptions options, + ValueInSingleWindow<T> value) { + if (tableSpec != null) { + return tableSpec.get(); + } else { + TableReference table = tableRefFunction.apply(value); + if (table.getProjectId() == null) { + table.setProjectId(options.getProject()); + } + return BigQueryHelpers.toTableSpec(table); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java index a86adfb..f7d8252 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkNotNull; http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java new file mode 100644 index 0000000..6219226 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import java.util.UUID; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Writes each bundle of {@link TableRow} elements out to a separate file using + * {@link TableRowWriter}. + */ +class WriteBundles extends DoFn<TableRow, KV<String, Long>> { + private static final Logger LOG = LoggerFactory.getLogger(WriteBundles.class); + + private transient TableRowWriter writer = null; + private final String tempFilePrefix; + + WriteBundles(String tempFilePrefix) { + this.tempFilePrefix = tempFilePrefix; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + if (writer == null) { + writer = new TableRowWriter(tempFilePrefix); + writer.open(UUID.randomUUID().toString()); + LOG.debug("Done opening writer {}", writer); + } + try { + writer.write(c.element()); + } catch (Exception e) { + // Discard write result and close the write. + try { + writer.close(); + // The writer does not need to be reset, as this DoFn cannot be reused. + } catch (Exception closeException) { + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; + } + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + if (writer != null) { + c.output(writer.close()); + writer = null; + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder + .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) + .withLabel("Temporary File Prefix")); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java new file mode 100644 index 0000000..f4bf198 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.common.collect.Lists; +import java.util.List; +import java.util.UUID; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Partitions temporary files based on number of files and file sizes. + */ +class WritePartition extends DoFn<String, KV<Long, List<String>>> { + private final PCollectionView<Iterable<KV<String, Long>>> resultsView; + private TupleTag<KV<Long, List<String>>> multiPartitionsTag; + private TupleTag<KV<Long, List<String>>> singlePartitionTag; + + public WritePartition( + PCollectionView<Iterable<KV<String, Long>>> resultsView, + TupleTag<KV<Long, List<String>>> multiPartitionsTag, + TupleTag<KV<Long, List<String>>> singlePartitionTag) { + this.resultsView = resultsView; + this.multiPartitionsTag = multiPartitionsTag; + this.singlePartitionTag = singlePartitionTag; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView)); + if (results.isEmpty()) { + TableRowWriter writer = new TableRowWriter(c.element()); + writer.open(UUID.randomUUID().toString()); + results.add(writer.close()); + } + + long partitionId = 0; + int currNumFiles = 0; + long currSizeBytes = 0; + List<String> currResults = Lists.newArrayList(); + for (int i = 0; i < results.size(); ++i) { + KV<String, Long> fileResult = results.get(i); + if (currNumFiles + 1 > Write.MAX_NUM_FILES + || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) { + c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults)); + currResults = Lists.newArrayList(); + currNumFiles = 0; + currSizeBytes = 0; + } + ++currNumFiles; + currSizeBytes += fileResult.getValue(); + currResults.add(fileResult.getKey()); + } + if (partitionId == 0) { + c.sideOutput(singlePartitionTag, KV.of(++partitionId, currResults)); + } else { + c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults)); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java new file mode 100644 index 0000000..8cb9439 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationTableCopy; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.TableReference; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; +import javax.annotation.Nullable; + +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PCollectionView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Copies temporary tables to destination table. + */ +class WriteRename extends DoFn<String, Void> { + private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class); + + private final BigQueryServices bqServices; + private final PCollectionView<String> jobIdToken; + private final ValueProvider<String> jsonTableRef; + private final WriteDisposition writeDisposition; + private final CreateDisposition createDisposition; + private final PCollectionView<Iterable<String>> tempTablesView; + @Nullable + private final String tableDescription; + + public WriteRename( + BigQueryServices bqServices, + PCollectionView<String> jobIdToken, + ValueProvider<String> jsonTableRef, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + PCollectionView<Iterable<String>> tempTablesView, + @Nullable String tableDescription) { + this.bqServices = bqServices; + this.jobIdToken = jobIdToken; + this.jsonTableRef = jsonTableRef; + this.writeDisposition = writeDisposition; + this.createDisposition = createDisposition; + this.tempTablesView = tempTablesView; + this.tableDescription = tableDescription; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView)); + + // Do not copy if no temp tables are provided + if (tempTablesJson.size() == 0) { + return; + } + + List<TableReference> tempTables = Lists.newArrayList(); + for (String table : tempTablesJson) { + tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class)); + } + copy( + bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), + c.sideInput(jobIdToken), + BigQueryHelpers.fromJsonString(jsonTableRef.get(), TableReference.class), + tempTables, + writeDisposition, + createDisposition, + tableDescription); + + DatasetService tableService = + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); + removeTemporaryTables(tableService, tempTables); + } + + private void copy( + JobService jobService, + DatasetService datasetService, + String jobIdPrefix, + TableReference ref, + List<TableReference> tempTables, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + @Nullable String tableDescription) throws InterruptedException, IOException { + JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy() + .setSourceTables(tempTables) + .setDestinationTable(ref) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()); + + String projectId = ref.getProjectId(); + Job lastFailedCopyJob = null; + for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) { + String jobId = jobIdPrefix + "-" + i; + JobReference jobRef = new JobReference() + .setProjectId(projectId) + .setJobId(jobId); + jobService.startCopyJob(jobRef, copyConfig); + Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES); + Status jobStatus = BigQueryHelpers.parseStatus(copyJob); + switch (jobStatus) { + case SUCCEEDED: + if (tableDescription != null) { + datasetService.patchTableDescription(ref, tableDescription); + } + return; + case UNKNOWN: + throw new RuntimeException(String.format( + "UNKNOWN status of copy job [%s]: %s.", jobId, + BigQueryHelpers.jobToPrettyString(copyJob))); + case FAILED: + lastFailedCopyJob = copyJob; + continue; + default: + throw new IllegalStateException(String.format( + "Unexpected status [%s] of load job: %s.", + jobStatus, BigQueryHelpers.jobToPrettyString(copyJob))); + } + } + throw new RuntimeException(String.format( + "Failed to create copy job with id prefix %s, " + + "reached max retries: %d, last failed copy job: %s.", + jobIdPrefix, + Write.MAX_RETRY_JOBS, + BigQueryHelpers.jobToPrettyString(lastFailedCopyJob))); + } + + static void removeTemporaryTables(DatasetService tableService, + List<TableReference> tempTables) { + for (TableReference tableRef : tempTables) { + try { + LOG.debug("Deleting table {}", BigQueryHelpers.toJsonString(tableRef)); + tableService.deleteTable(tableRef); + } catch (Exception e) { + LOG.warn("Failed to delete the table {}", BigQueryHelpers.toJsonString(tableRef), e); + } + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder + .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef) + .withLabel("Table Reference")) + .add(DisplayData.item("writeDisposition", writeDisposition.toString()) + .withLabel("Write Disposition")) + .add(DisplayData.item("createDisposition", createDisposition.toString()) + .withLabel("Create Disposition")); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java new file mode 100644 index 0000000..29680ad --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nullable; + +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.FileIOChannelFactory; +import org.apache.beam.sdk.util.GcsIOChannelFactory; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Writes partitions to BigQuery tables. + */ +class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> { + private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class); + + private final boolean singlePartition; + private final BigQueryServices bqServices; + private final PCollectionView<String> jobIdToken; + private final String tempFilePrefix; + private final ValueProvider<String> jsonTableRef; + private final ValueProvider<String> jsonSchema; + private final WriteDisposition writeDisposition; + private final CreateDisposition createDisposition; + @Nullable + private final String tableDescription; + + public WriteTables( + boolean singlePartition, + BigQueryServices bqServices, + PCollectionView<String> jobIdToken, + String tempFilePrefix, + ValueProvider<String> jsonTableRef, + ValueProvider<String> jsonSchema, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + @Nullable String tableDescription) { + this.singlePartition = singlePartition; + this.bqServices = bqServices; + this.jobIdToken = jobIdToken; + this.tempFilePrefix = tempFilePrefix; + this.jsonTableRef = jsonTableRef; + this.jsonSchema = jsonSchema; + this.writeDisposition = writeDisposition; + this.createDisposition = createDisposition; + this.tableDescription = tableDescription; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + List<String> partition = Lists.newArrayList(c.element().getValue()).get(0); + String jobIdPrefix = String.format( + c.sideInput(jobIdToken) + "_%05d", c.element().getKey()); + TableReference ref = BigQueryHelpers.fromJsonString(jsonTableRef.get(), + TableReference.class); + if (!singlePartition) { + ref.setTableId(jobIdPrefix); + } + + load( + bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + ref, + BigQueryHelpers.fromJsonString( + jsonSchema == null ? null : jsonSchema.get(), TableSchema.class), + partition, + writeDisposition, + createDisposition, + tableDescription); + c.output(BigQueryHelpers.toJsonString(ref)); + + removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition); + } + + private void load( + JobService jobService, + DatasetService datasetService, + String jobIdPrefix, + TableReference ref, + @Nullable TableSchema schema, + List<String> gcsUris, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + @Nullable String tableDescription) throws InterruptedException, IOException { + JobConfigurationLoad loadConfig = new JobConfigurationLoad() + .setDestinationTable(ref) + .setSchema(schema) + .setSourceUris(gcsUris) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()) + .setSourceFormat("NEWLINE_DELIMITED_JSON"); + + String projectId = ref.getProjectId(); + Job lastFailedLoadJob = null; + for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) { + String jobId = jobIdPrefix + "-" + i; + JobReference jobRef = new JobReference() + .setProjectId(projectId) + .setJobId(jobId); + jobService.startLoadJob(jobRef, loadConfig); + Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES); + Status jobStatus = BigQueryHelpers.parseStatus(loadJob); + switch (jobStatus) { + case SUCCEEDED: + if (tableDescription != null) { + datasetService.patchTableDescription(ref, tableDescription); + } + return; + case UNKNOWN: + throw new RuntimeException(String.format( + "UNKNOWN status of load job [%s]: %s.", jobId, + BigQueryHelpers.jobToPrettyString(loadJob))); + case FAILED: + lastFailedLoadJob = loadJob; + continue; + default: + throw new IllegalStateException(String.format( + "Unexpected status [%s] of load job: %s.", + jobStatus, BigQueryHelpers.jobToPrettyString(loadJob))); + } + } + throw new RuntimeException(String.format( + "Failed to create load job with id prefix %s, " + + "reached max retries: %d, last failed load job: %s.", + jobIdPrefix, + Write.MAX_RETRY_JOBS, + BigQueryHelpers.jobToPrettyString(lastFailedLoadJob))); + } + + static void removeTemporaryFiles( + PipelineOptions options, + String tempFilePrefix, + Collection<String> files) + throws IOException { + IOChannelFactory factory = IOChannelUtils.getFactory(tempFilePrefix); + if (factory instanceof GcsIOChannelFactory) { + GcsUtil gcsUtil = new GcsUtilFactory().create(options); + gcsUtil.remove(files); + } else if (factory instanceof FileIOChannelFactory) { + for (String filename : files) { + LOG.debug("Removing file {}", filename); + boolean exists = Files.deleteIfExists(Paths.get(filename)); + if (!exists) { + LOG.debug("{} does not exist.", filename); + } + } + } else { + throw new IOException("Unrecognized file system."); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder + .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) + .withLabel("Temporary File Prefix")) + .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef) + .withLabel("Table Reference")) + .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema) + .withLabel("Table Schema")) + .addIfNotNull(DisplayData.item("tableDescription", tableDescription) + .withLabel("Table Description")); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index eca95b9..e7db8a2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -102,14 +102,10 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.TableRowWriter; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WritePartition; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteRename; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteTables; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation; @@ -654,7 +650,9 @@ public class BigQueryIOTest implements Serializable { @Rule public final transient TestPipeline p = TestPipeline.create(); @Rule public transient ExpectedException thrown = ExpectedException.none(); - @Rule public transient ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class); + @Rule public transient ExpectedLogs loggedBigQueryIO = ExpectedLogs.none(BigQueryIO.class); + @Rule public transient ExpectedLogs loggedWriteRename = ExpectedLogs.none(WriteRename.class); + @Rule public transient ExpectedLogs loggedWriteTables = ExpectedLogs.none(WriteTables.class); @Rule public transient TemporaryFolder testFolder = new TemporaryFolder(); @Mock(extraInterfaces = Serializable.class) public transient BigQueryServices.JobService mockJobService; @@ -1577,7 +1575,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testStreamingWriteFnCreateNever() throws Exception { - BigQueryIO.StreamingWriteFn fn = new BigQueryIO.StreamingWriteFn( + StreamingWriteFn fn = new StreamingWriteFn( null, CreateDisposition.CREATE_NEVER, null, new FakeBigQueryServices()); assertEquals(BigQueryHelpers.parseTableSpec("dataset.table"), fn.getOrCreateTable(null, "dataset.table")); @@ -2230,9 +2228,9 @@ public class BigQueryIOTest implements Serializable { testNumFiles(tempDir, 0); for (String fileName : fileNames) { - logged.verifyDebug("Removing file " + fileName); + loggedWriteTables.verifyDebug("Removing file " + fileName); } - logged.verifyDebug(fileNames.get(numFiles) + " does not exist."); + loggedWriteTables.verifyDebug(fileNames.get(numFiles) + " does not exist."); } @Test @@ -2297,11 +2295,14 @@ public class BigQueryIOTest implements Serializable { WriteRename.removeTemporaryTables(mockDatasetService, tableRefs); for (TableReference ref : tableRefs) { - logged.verifyDebug("Deleting table " + toJsonString(ref)); + loggedWriteRename.verifyDebug("Deleting table " + toJsonString(ref)); } - logged.verifyWarn("Failed to delete the table " + toJsonString(tableRefs.get(0))); - logged.verifyNotLogged("Failed to delete the table " + toJsonString(tableRefs.get(1))); - logged.verifyNotLogged("Failed to delete the table " + toJsonString(tableRefs.get(2))); + loggedWriteRename.verifyWarn("Failed to delete the table " + + toJsonString(tableRefs.get(0))); + loggedWriteRename.verifyNotLogged("Failed to delete the table " + + toJsonString(tableRefs.get(1))); + loggedWriteRename.verifyNotLogged("Failed to delete the table " + + toJsonString(tableRefs.get(2))); } /** Test options. **/ @@ -2367,8 +2368,8 @@ public class BigQueryIOTest implements Serializable { public void testTagWithUniqueIdsAndTableProjectNotNullWithNvp() { BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); bqOptions.setProject("project"); - BigQueryIO.TagWithUniqueIdsAndTable<TableRow> tag = - new BigQueryIO.TagWithUniqueIdsAndTable<TableRow>( + TagWithUniqueIdsAndTable<TableRow> tag = + new TagWithUniqueIdsAndTable<TableRow>( bqOptions, NestedValueProvider.of( StaticValueProvider.of("data_set.table_name"), new TableSpecToTableRef()), null, null); @@ -2423,12 +2424,12 @@ public class BigQueryIOTest implements Serializable { @Test public void testShardedKeyCoderIsSerializableWithWellKnownCoderType() { - CoderProperties.coderSerializable(BigQueryIO.ShardedKeyCoder.of(GlobalWindow.Coder.INSTANCE)); + CoderProperties.coderSerializable(ShardedKeyCoder.of(GlobalWindow.Coder.INSTANCE)); } @Test public void testTableRowInfoCoderSerializable() { - CoderProperties.coderSerializable(BigQueryIO.TableRowInfoCoder.of()); + CoderProperties.coderSerializable(TableRowInfoCoder.of()); } @Test @@ -2436,8 +2437,8 @@ public class BigQueryIOTest implements Serializable { CoderProperties.coderSerializable( WindowedValue.getFullCoder( KvCoder.of( - BigQueryIO.ShardedKeyCoder.of(StringUtf8Coder.of()), - BigQueryIO.TableRowInfoCoder.of()), + ShardedKeyCoder.of(StringUtf8Coder.of()), + TableRowInfoCoder.of()), IntervalWindow.getCoder())); } }
