[
https://issues.apache.org/jira/browse/BEAM-4824?focusedWorklogId=175843&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-175843
]
ASF GitHub Bot logged work on BEAM-4824:
----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Dec/18 18:13
Start Date: 16/Dec/18 18:13
Worklog Time Spent: 10m
Work Description: stale[bot] closed pull request #6055: [BEAM-4824] Batch
BigQueryIO returns job results
URL: https://github.com/apache/beam/pull/6055
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index aebe0bc8e55f..2d7a8107fe85 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -26,6 +26,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
@@ -69,7 +70,6 @@
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -283,9 +283,18 @@ private WriteResult
expandTriggered(PCollection<KV<DestinationT, TableRow>> inpu
singlePartitionTag))
.withSideInputs(tempFilePrefixView)
.withOutputTags(multiPartitionsTag,
TupleTagList.of(singlePartitionTag)));
- PCollection<KV<TableDestination, String>> tempTables =
+ PCollection<KV<TableDestination, BigQueryWriteResult>> tempTables =
writeTempTables(partitions.get(multiPartitionsTag),
loadJobIdPrefixView);
tempTables
+ .apply(
+ ParDo.of(
+ new DoFn<
+ KV<TableDestination, BigQueryWriteResult>,
KV<TableDestination, String>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(c.element().getKey(),
c.element().getValue().getTable()));
+ }
+ }))
// Now that the load job has happened, we want the rename to happen
immediately.
.apply(
Window.<KV<TableDestination, String>>into(new GlobalWindows())
@@ -306,8 +315,9 @@ private WriteResult
expandTriggered(PCollection<KV<DestinationT, TableRow>> inpu
createDisposition,
maxRetryJobs))
.withSideInputs(loadJobIdPrefixView));
- writeSinglePartition(partitions.get(singlePartitionTag),
loadJobIdPrefixView);
- return writeResult(p);
+ PCollection<KV<TableDestination, BigQueryWriteResult>>
singlePartitionResults =
+ writeSinglePartition(partitions.get(singlePartitionTag),
loadJobIdPrefixView);
+ return writeResult(p,
PCollectionList.of(Arrays.asList(singlePartitionResults, tempTables)));
}
// Expand the pipeline when the user has not requested
periodically-triggered file writes.
@@ -350,10 +360,19 @@ public WriteResult
expandUntriggered(PCollection<KV<DestinationT, TableRow>> inp
singlePartitionTag))
.withSideInputs(tempFilePrefixView)
.withOutputTags(multiPartitionsTag,
TupleTagList.of(singlePartitionTag)));
- PCollection<KV<TableDestination, String>> tempTables =
+ PCollection<KV<TableDestination, BigQueryWriteResult>> tempTables =
writeTempTables(partitions.get(multiPartitionsTag),
loadJobIdPrefixView);
tempTables
+ .apply(
+ ParDo.of(
+ new DoFn<
+ KV<TableDestination, BigQueryWriteResult>,
KV<TableDestination, String>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(c.element().getKey(),
c.element().getValue().getTable()));
+ }
+ }))
.apply("ReifyRenameInput", new ReifyAsIterable<>())
.setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoderV2.of(),
StringUtf8Coder.of())))
.apply(
@@ -366,8 +385,9 @@ public WriteResult
expandUntriggered(PCollection<KV<DestinationT, TableRow>> inp
createDisposition,
maxRetryJobs))
.withSideInputs(loadJobIdPrefixView));
- writeSinglePartition(partitions.get(singlePartitionTag),
loadJobIdPrefixView);
- return writeResult(p);
+ PCollection<KV<TableDestination, BigQueryWriteResult>>
singlePartitionResults =
+ writeSinglePartition(partitions.get(singlePartitionTag),
loadJobIdPrefixView);
+ return writeResult(p,
PCollectionList.of(Arrays.asList(singlePartitionResults, tempTables)));
}
// Generate the base job id string.
@@ -505,7 +525,7 @@ public void processElement(ProcessContext c) {
}
// Take in a list of files and write them to temporary tables.
- private PCollection<KV<TableDestination, String>> writeTempTables(
+ private PCollection<KV<TableDestination, BigQueryWriteResult>>
writeTempTables(
PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
PCollectionView<String> jobIdTokenView) {
List<PCollectionView<?>> sideInputs = Lists.newArrayList(jobIdTokenView);
@@ -541,7 +561,7 @@ public void processElement(ProcessContext c) {
// In the case where the files fit into a single load job, there's no need
to write temporary
// tables and rename. We can load these files directly into the target
BigQuery table.
- void writeSinglePartition(
+ PCollection<KV<TableDestination, BigQueryWriteResult>> writeSinglePartition(
PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
PCollectionView<String> loadJobIdPrefixView) {
List<PCollectionView<?>> sideInputs =
Lists.newArrayList(loadJobIdPrefixView);
@@ -551,7 +571,7 @@ void writeSinglePartition(
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
ListCoder.of(StringUtf8Coder.of()));
// Write single partition to final table
- input
+ return input
.setCoder(partitionsCoder)
// Reshuffle will distribute this among multiple workers, and also
guard against
// reexecution of the WritePartitions step once WriteTables has begun.
@@ -571,10 +591,10 @@ void writeSinglePartition(
ignoreUnknownValues));
}
- private WriteResult writeResult(Pipeline p) {
- PCollection<TableRow> empty =
- p.apply("CreateEmptyFailedInserts",
Create.empty(TypeDescriptor.of(TableRow.class)));
- return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
+ private WriteResult writeResult(
+ Pipeline p, PCollectionList<KV<TableDestination, BigQueryWriteResult>>
results) {
+ return WriteResult.withLoadResults(
+ p, new TupleTag<>("loadJobResults"),
results.apply(Flatten.pCollections()));
}
@Override
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryWriteResult.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryWriteResult.java
new file mode 100644
index 000000000000..683b2430e54a
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryWriteResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.util.Objects;
+
+/**
+ * Model definition for BigQueryWriteResult.
+ *
+ * <p>This class represents the result of a batch load job against BQ.
+ */
+public class BigQueryWriteResult {
+
+ private final BigQueryHelpers.Status status;
+ private final String table;
+
+ BigQueryWriteResult(BigQueryHelpers.Status status, String table) {
+ this.status = status;
+ this.table = table;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public BigQueryHelpers.Status getStatus() {
+ return status;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BigQueryWriteResult that = (BigQueryWriteResult) o;
+ return status == that.status && Objects.equals(table, that.table);
+ }
+
+ @Override
+ public int hashCode() {
+
+ return Objects.hash(status, table);
+ }
+
+ @Override
+ public String toString() {
+ return "BigQueryWriteResult{ status: " + status.name() + ", table: " +
table + "}.";
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryWriteResultCoder.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryWriteResultCoder.java
new file mode 100644
index 000000000000..df3f7e24468b
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryWriteResultCoder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/** A {@link Coder} that encodes BigQuery {@link BigQueryWriteResult} objects.
*/
+public class BigQueryWriteResultCoder extends AtomicCoder<BigQueryWriteResult>
{
+
+ private static final BigQueryWriteResultCoder INSTANCE = new
BigQueryWriteResultCoder();
+
+ public static BigQueryWriteResultCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(BigQueryWriteResult value, OutputStream outStream) throws
IOException {
+ StringUtf8Coder.of().encode(value.getStatus().name(), outStream);
+ StringUtf8Coder.of().encode(value.getTable(), outStream);
+ }
+
+ @Override
+ public BigQueryWriteResult decode(InputStream inStream) throws IOException {
+ return new BigQueryWriteResult(
+ BigQueryHelpers.Status.valueOf(StringUtf8Coder.of().decode(inStream)),
+ StringUtf8Coder.of().decode(inStream));
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
index 72c49d12c9d4..8ca7a9d77c9b 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
@@ -24,6 +24,7 @@
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
@@ -37,26 +38,37 @@
private final PCollection<TableRow> failedInserts;
private final TupleTag<BigQueryInsertError> failedInsertsWithErrTag;
private final PCollection<BigQueryInsertError> failedInsertsWithErr;
+ private final TupleTag<KV<TableDestination, BigQueryWriteResult>>
loadResultsTag;
+ private final PCollection<KV<TableDestination, BigQueryWriteResult>>
loadResults;
/** Creates a {@link WriteResult} in the given {@link Pipeline}. */
static WriteResult in(
Pipeline pipeline, TupleTag<TableRow> failedInsertsTag,
PCollection<TableRow> failedInserts) {
- return new WriteResult(pipeline, failedInsertsTag, failedInserts, null,
null);
+ return new WriteResult(pipeline, failedInsertsTag, failedInserts, null,
null, null, null);
}
static WriteResult withExtendedErrors(
Pipeline pipeline,
TupleTag<BigQueryInsertError> failedInsertsTag,
PCollection<BigQueryInsertError> failedInserts) {
- return new WriteResult(pipeline, null, null, failedInsertsTag,
failedInserts);
+ return new WriteResult(pipeline, null, null, failedInsertsTag,
failedInserts, null, null);
+ }
+
+ static WriteResult withLoadResults(
+ Pipeline pipeline,
+ TupleTag<KV<TableDestination, BigQueryWriteResult>> loadResultsTag,
+ PCollection<KV<TableDestination, BigQueryWriteResult>> loadResults) {
+ return new WriteResult(pipeline, null, null, null, null, loadResultsTag,
loadResults);
}
@Override
public Map<TupleTag<?>, PValue> expand() {
if (failedInsertsTag != null) {
return ImmutableMap.of(failedInsertsTag, failedInserts);
- } else {
+ } else if (failedInsertsWithErrTag != null) {
return ImmutableMap.of(failedInsertsWithErrTag, failedInsertsWithErr);
+ } else {
+ return ImmutableMap.of(loadResultsTag, loadResults);
}
}
@@ -65,12 +77,16 @@ private WriteResult(
TupleTag<TableRow> failedInsertsTag,
PCollection<TableRow> failedInserts,
TupleTag<BigQueryInsertError> failedInsertsWithErrTag,
- PCollection<BigQueryInsertError> failedInsertsWithErr) {
+ PCollection<BigQueryInsertError> failedInsertsWithErr,
+ TupleTag<KV<TableDestination, BigQueryWriteResult>> loadResultsTag,
+ PCollection<KV<TableDestination, BigQueryWriteResult>> loadResults) {
this.pipeline = pipeline;
this.failedInsertsTag = failedInsertsTag;
this.failedInserts = failedInserts;
this.failedInsertsWithErrTag = failedInsertsWithErrTag;
this.failedInsertsWithErr = failedInsertsWithErr;
+ this.loadResultsTag = loadResultsTag;
+ this.loadResults = loadResults;
}
/**
@@ -103,6 +119,17 @@ private WriteResult(
return failedInsertsWithErr;
}
+ /**
+ * Returns a {@link PCollection} containing pairs of {@link
TableDestination} -> {@link
+ * BigQueryWriteResult} indicating the result of the load job per table
destination.
+ */
+ public PCollection<KV<TableDestination, BigQueryWriteResult>>
getLoadJobResults() {
+ checkArgument(
+ loadResultsTag != null,
+ "Cannot use getLoadJobResults as this WriteResult was not configured
to represent a load jobs result");
+ return loadResults;
+ }
+
@Override
public Pipeline getPipeline() {
return pipeline;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index a509fa2eda71..2a0fc18cfdce 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -88,7 +88,7 @@
class WriteTables<DestinationT>
extends PTransform<
PCollection<KV<ShardedKey<DestinationT>, List<String>>>,
- PCollection<KV<TableDestination, String>>> {
+ PCollection<KV<TableDestination, BigQueryWriteResult>>> {
private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class);
private final boolean singlePartition;
@@ -98,14 +98,15 @@
private final CreateDisposition firstPaneCreateDisposition;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
private final List<PCollectionView<?>> sideInputs;
- private final TupleTag<KV<TableDestination, String>> mainOutputTag;
+ private final TupleTag<KV<TableDestination, BigQueryWriteResult>>
mainOutputTag;
private final TupleTag<String> temporaryFilesTag;
private final ValueProvider<String> loadJobProjectId;
private final int maxRetryJobs;
private final boolean ignoreUnknownValues;
private class WriteTablesDoFn
- extends DoFn<KV<ShardedKey<DestinationT>, List<String>>,
KV<TableDestination, String>> {
+ extends DoFn<
+ KV<ShardedKey<DestinationT>, List<String>>, KV<TableDestination,
BigQueryWriteResult>> {
private Map<DestinationT, String> jsonSchemas = Maps.newHashMap();
@StartBundle
@@ -166,19 +167,19 @@ public void processElement(ProcessContext c) throws
Exception {
(c.pane().getIndex() == 0) ? firstPaneWriteDisposition :
WriteDisposition.WRITE_APPEND;
CreateDisposition createDisposition =
(c.pane().getIndex() == 0) ? firstPaneCreateDisposition :
CreateDisposition.CREATE_NEVER;
- load(
-
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
-
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
- jobIdPrefix,
- tableReference,
- tableDestination.getTimePartitioning(),
- tableSchema,
- partitionFiles,
- writeDisposition,
- createDisposition,
- tableDestination.getTableDescription());
- c.output(
- mainOutputTag, KV.of(tableDestination,
BigQueryHelpers.toJsonString(tableReference)));
+ BigQueryWriteResult result =
+ load(
+
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
+ jobIdPrefix,
+ tableReference,
+ tableDestination.getTimePartitioning(),
+ tableSchema,
+ partitionFiles,
+ writeDisposition,
+ createDisposition,
+ tableDestination.getTableDescription());
+ c.output(mainOutputTag, KV.of(tableDestination, result));
for (String file : partitionFiles) {
c.output(temporaryFilesTag, file);
}
@@ -218,7 +219,7 @@ public WriteTables(
}
@Override
- public PCollection<KV<TableDestination, String>> expand(
+ public PCollection<KV<TableDestination, BigQueryWriteResult>> expand(
PCollection<KV<ShardedKey<DestinationT>, List<String>>> input) {
PCollectionTuple writeTablesOutputs =
input.apply(
@@ -245,10 +246,12 @@ public WriteTables(
.apply(Values.create())
.apply(ParDo.of(new GarbageCollectTemporaryFiles()));
- return writeTablesOutputs.get(mainOutputTag);
+ return writeTablesOutputs
+ .get(mainOutputTag)
+ .setCoder(KvCoder.of(TableDestinationCoderV2.of(),
BigQueryWriteResultCoder.of()));
}
- private void load(
+ private BigQueryWriteResult load(
JobService jobService,
DatasetService datasetService,
String jobIdPrefix,
@@ -328,7 +331,7 @@ private void load(
ref.clone().setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
tableDescription);
}
- return;
+ return new BigQueryWriteResult(Status.SUCCEEDED,
BigQueryHelpers.toJsonString(ref));
case UNKNOWN:
// This might happen if BigQuery's job listing is slow. Retry with
the same
// job id.
@@ -350,17 +353,10 @@ private void load(
jobId);
continue;
default:
- throw new IllegalStateException(
- String.format(
- "Unexpected status [%s] of load job: %s.",
- loadJob.getStatus(),
BigQueryHelpers.jobToPrettyString(loadJob)));
+ return new BigQueryWriteResult(jobStatus,
BigQueryHelpers.toJsonString(ref));
}
} while (nextBackOff(sleeper, backoff));
- throw new RuntimeException(
- String.format(
- "Failed to create load job with id prefix %s, "
- + "reached max retries: %d, last failed load job: %s.",
- jobIdPrefix, maxRetryJobs,
BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
+ return new
BigQueryWriteResult(BigQueryHelpers.parseStatus(lastFailedLoadJob),
BigQueryHelpers.toJsonString(ref));
}
/** Identical to {@link BackOffUtils#next} but without checked IOException.
*/
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 971fed8d52fe..beb09e79014d 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static java.util.stream.Collectors.toList;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
import static
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.allOf;
@@ -64,9 +65,11 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
@@ -77,8 +80,10 @@
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
@@ -115,6 +120,11 @@
/** Tests for {@link BigQueryIO#write}. */
@RunWith(JUnit4.class)
public class BigQueryIOWriteTest implements Serializable {
+ private static final String DATASET_ID = "dataset-id";
+ private static final String PROJECT_ID = "project-id";
+ private static final String TABLE_ID = "table-id";
+ private static final String TABLE_SPEC = PROJECT_ID + ":" + DATASET_ID + "."
+ TABLE_ID;
+
private transient PipelineOptions options;
private transient TemporaryFolder testFolder = new TemporaryFolder();
private transient TestPipeline p;
@@ -133,7 +143,7 @@ public Statement apply(final Statement base, final
Description description) {
@Override
public void evaluate() throws Throwable {
options = TestPipeline.testingPipelineOptions();
- options.as(BigQueryOptions.class).setProject("project-id");
+ options.as(BigQueryOptions.class).setProject(PROJECT_ID);
options
.as(BigQueryOptions.class)
.setTempLocation(testFolder.getRoot().getAbsolutePath());
@@ -160,7 +170,7 @@ public void setUp() throws IOException,
InterruptedException {
FakeDatasetService.setUp();
BigQueryIO.clearCreatedTables();
- fakeDatasetService.createDataset("project-id", "dataset-id", "", "", null);
+ fakeDatasetService.createDataset(PROJECT_ID, DATASET_ID, "", "", null);
}
@After
@@ -178,20 +188,23 @@ public void testWriteEmptyPCollection() throws Exception {
.setFields(
ImmutableList.of(new
TableFieldSchema().setName("number").setType("INTEGER")));
- p.apply(Create.empty(TableRowJsonCoder.of()))
- .apply(
- BigQueryIO.writeTableRows()
- .to("project-id:dataset-id.table-id")
- .withTestServices(fakeBqServices)
-
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
-
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
- .withSchema(schema)
- .withoutValidation());
+ WriteResult wr =
+ p.apply(Create.empty(TableRowJsonCoder.of()))
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to(TABLE_SPEC)
+ .withTestServices(fakeBqServices)
+
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withSchema(schema)
+ .withoutValidation());
+
+ PAssert.that(wr.getLoadJobResults())
+
.containsInAnyOrder(buildKVWithResult(BigQueryHelpers.Status.SUCCEEDED,
TABLE_SPEC));
+
p.run();
- checkNotNull(
- fakeDatasetService.getTable(
- BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id")));
+
checkNotNull(fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(TABLE_SPEC)));
}
@Test
@@ -237,73 +250,75 @@ public void writeDynamicDestinations(boolean streaming)
throws Exception {
// Use a partition decorator to verify that partition decorators are
supported.
final String partitionDecorator = "20171127";
- users.apply(
- "WriteBigQuery",
- BigQueryIO.<String>write()
- .withTestServices(fakeBqServices)
- .withMaxFilesPerBundle(5)
- .withMaxFileSize(10)
-
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
- .withFormatFunction(
- user -> {
- Matcher matcher = userPattern.matcher(user);
- if (matcher.matches()) {
- return new TableRow()
- .set("name", matcher.group(1))
- .set("id", Integer.valueOf(matcher.group(2)));
- }
- throw new RuntimeException("Unmatching element " + user);
- })
- .to(
- new StringIntegerDestinations() {
- @Override
- public Integer getDestination(ValueInSingleWindow<String>
element) {
- assertThat(
- element.getWindow(),
Matchers.instanceOf(PartitionedGlobalWindow.class));
- Matcher matcher = userPattern.matcher(element.getValue());
- if (matcher.matches()) {
- // Since we name tables by userid, we can simply store
an Integer to represent
- // a table.
- return Integer.valueOf(matcher.group(2));
- }
- throw new RuntimeException("Unmatching destination " +
element.getValue());
- }
+ WriteResult wr =
+ users.apply(
+ "WriteBigQuery",
+ BigQueryIO.<String>write()
+ .withTestServices(fakeBqServices)
+ .withMaxFilesPerBundle(5)
+ .withMaxFileSize(10)
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withFormatFunction(
+ user -> {
+ Matcher matcher = userPattern.matcher(user);
+ if (matcher.matches()) {
+ return new TableRow()
+ .set("name", matcher.group(1))
+ .set("id", Integer.valueOf(matcher.group(2)));
+ }
+ throw new RuntimeException("Unmatching element " + user);
+ })
+ .to(
+ new StringIntegerDestinations() {
+ @Override
+ public Integer
getDestination(ValueInSingleWindow<String> element) {
+ assertThat(
+ element.getWindow(),
+
Matchers.instanceOf(PartitionedGlobalWindow.class));
+ Matcher matcher =
userPattern.matcher(element.getValue());
+ if (matcher.matches()) {
+ // Since we name tables by userid, we can simply
store an Integer to
+ // represent
+ // a table.
+ return Integer.valueOf(matcher.group(2));
+ }
+ throw new RuntimeException("Unmatching destination " +
element.getValue());
+ }
- @Override
- public TableDestination getTable(Integer userId) {
- verifySideInputs();
- // Each user in it's own table.
- return new TableDestination(
- "dataset-id.userid-" + userId + "$" +
partitionDecorator,
- "table for userid " + userId);
- }
+ @Override
+ public TableDestination getTable(Integer userId) {
+ verifySideInputs();
+ // Each user in it's own table.
+ return new TableDestination(
+ DATASET_ID + ".userid-" + userId + "$" +
partitionDecorator,
+ "table for userid " + userId);
+ }
- @Override
- public TableSchema getSchema(Integer userId) {
- verifySideInputs();
- return new TableSchema()
- .setFields(
- ImmutableList.of(
- new
TableFieldSchema().setName("name").setType("STRING"),
- new
TableFieldSchema().setName("id").setType("INTEGER")));
- }
+ @Override
+ public TableSchema getSchema(Integer userId) {
+ verifySideInputs();
+ return new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("name").setType("STRING"),
+ new
TableFieldSchema().setName("id").setType("INTEGER")));
+ }
- @Override
- public List<PCollectionView<?>> getSideInputs() {
- return ImmutableList.of(sideInput1, sideInput2);
- }
+ @Override
+ public List<PCollectionView<?>> getSideInputs() {
+ return ImmutableList.of(sideInput1, sideInput2);
+ }
- private void verifySideInputs() {
- assertThat(sideInput(sideInput1), containsInAnyOrder("a",
"b", "c"));
- Map<String, String> mapSideInput = sideInput(sideInput2);
- assertEquals(3, mapSideInput.size());
- assertThat(
- mapSideInput,
- allOf(hasEntry("a", "a"), hasEntry("b", "b"),
hasEntry("c", "c")));
- }
- })
- .withoutValidation());
- p.run();
+ private void verifySideInputs() {
+ assertThat(sideInput(sideInput1),
containsInAnyOrder("a", "b", "c"));
+ Map<String, String> mapSideInput =
sideInput(sideInput2);
+ assertEquals(3, mapSideInput.size());
+ assertThat(
+ mapSideInput,
+ allOf(hasEntry("a", "a"), hasEntry("b", "b"),
hasEntry("c", "c")));
+ }
+ })
+ .withoutValidation());
Map<Integer, List<TableRow>> expectedTableRows = Maps.newHashMap();
for (String anUserList : userList) {
@@ -316,13 +331,54 @@ private void verifySideInputs() {
expected.add(new TableRow().set("name", nickname).set("id", userid));
}
+ if (!streaming) {
+ List<KV<TableDestination, BigQueryWriteResult>> res =
+ expectedTableRows
+ .entrySet()
+ .stream()
+ .map(
+ entry -> {
+ TableDestination d =
+ new TableDestination(
+ PROJECT_ID
+ + ":"
+ + DATASET_ID
+ + ".userid-"
+ + entry.getKey().toString()
+ + "$"
+ + partitionDecorator,
+ "table for userid " + entry.getKey().toString());
+ return buildKVWithResult(BigQueryHelpers.Status.SUCCEEDED,
d);
+ })
+ .collect(toList());
+
+ PAssert.that(wr.getLoadJobResults()).containsInAnyOrder(res);
+ } else {
+ PAssert.that(wr.getFailedInserts()).empty();
+ }
+
+ p.run();
+
for (Map.Entry<Integer, List<TableRow>> entry :
expectedTableRows.entrySet()) {
assertThat(
- fakeDatasetService.getAllRows("project-id", "dataset-id", "userid-"
+ entry.getKey()),
+ fakeDatasetService.getAllRows(PROJECT_ID, DATASET_ID, "userid-" +
entry.getKey()),
containsInAnyOrder(Iterables.toArray(entry.getValue(),
TableRow.class)));
}
}
+ private KV<TableDestination, BigQueryWriteResult> buildKVWithResult(
+ BigQueryHelpers.Status status, String tableSpec) {
+ return buildKVWithResult(status, new TableDestination(tableSpec, null));
+ }
+
+ private KV<TableDestination, BigQueryWriteResult> buildKVWithResult(
+ BigQueryHelpers.Status status, TableDestination tableDest) {
+ return KV.of(
+ tableDest,
+ new BigQueryWriteResult(
+ status,
BigQueryHelpers.toJsonString(tableDest.getTableReference())));
+ }
+
@Test
public void testTimePartitioningStreamingInserts() throws Exception {
testTimePartitioning(BigQueryIO.Write.Method.STREAMING_INSERTS);
@@ -343,19 +399,29 @@ public void testTimePartitioning(BigQueryIO.Write.Method
insertMethod) throws Ex
new TableSchema()
.setFields(
ImmutableList.of(new
TableFieldSchema().setName("number").setType("INTEGER")));
- p.apply(Create.of(row1, row2))
- .apply(
- BigQueryIO.writeTableRows()
- .to("project-id:dataset-id.table-id")
- .withTestServices(fakeBqServices)
- .withMethod(insertMethod)
- .withSchema(schema)
- .withTimePartitioning(timePartitioning)
- .withoutValidation());
+ WriteResult wr =
+ p.apply(Create.of(row1, row2))
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to(TABLE_SPEC)
+ .withTestServices(fakeBqServices)
+ .withMethod(insertMethod)
+ .withSchema(schema)
+ .withTimePartitioning(timePartitioning)
+ .withoutValidation());
+
+ if (insertMethod == BigQueryIO.Write.Method.FILE_LOADS) {
+ PAssert.that(wr.getLoadJobResults())
+ .containsInAnyOrder(
+ buildKVWithResult(
+ BigQueryHelpers.Status.SUCCEEDED,
+ new TableDestination(TABLE_SPEC, null, timePartitioning)));
+ } else {
+ PAssert.that(wr.getFailedInserts()).empty();
+ }
+
p.run();
- Table table =
- fakeDatasetService.getTable(
- BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id"));
+ Table table =
fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(TABLE_SPEC));
assertEquals(schema, table.getSchema());
assertEquals(timePartitioning, table.getTimePartitioning());
}
@@ -380,24 +446,32 @@ public void testTriggeredFileLoads() throws Exception {
elements.get(20), Iterables.toArray(elements.subList(21, 30),
TableRow.class))
.advanceWatermarkToInfinity();
- p.apply(testStream)
- .apply(
- BigQueryIO.writeTableRows()
- .to("project-id:dataset-id.table-id")
- .withSchema(
- new TableSchema()
- .setFields(
- ImmutableList.of(
- new
TableFieldSchema().setName("number").setType("INTEGER"))))
- .withTestServices(fakeBqServices)
- .withTriggeringFrequency(Duration.standardSeconds(30))
- .withNumFileShards(2)
- .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
- .withoutValidation());
+ WriteResult wr =
+ p.apply(testStream)
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to(TABLE_SPEC)
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("number").setType("INTEGER"))))
+ .withTestServices(fakeBqServices)
+ .withTriggeringFrequency(Duration.standardSeconds(30))
+ .withNumFileShards(2)
+ .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
+ .withoutValidation());
+
+ // For some reason only two batches run through when three seems expected.
+ PAssert.that(wr.getLoadJobResults())
+ .containsInAnyOrder(
+ buildKVWithResult(BigQueryHelpers.Status.SUCCEEDED, TABLE_SPEC),
+ buildKVWithResult(BigQueryHelpers.Status.SUCCEEDED, TABLE_SPEC));
+
p.run();
assertThat(
- fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+ fakeDatasetService.getAllRows(PROJECT_ID, DATASET_ID, TABLE_ID),
containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));
}
@@ -416,24 +490,28 @@ public void testFailuresNoRetryPolicy() throws Exception {
row1, ImmutableList.of(ephemeralError, ephemeralError),
row2, ImmutableList.of(ephemeralError, ephemeralError)));
- p.apply(Create.of(row1, row2, row3))
- .apply(
- BigQueryIO.writeTableRows()
- .to("project-id:dataset-id.table-id")
-
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
- .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
- .withSchema(
- new TableSchema()
- .setFields(
- ImmutableList.of(
- new
TableFieldSchema().setName("name").setType("STRING"),
- new
TableFieldSchema().setName("number").setType("INTEGER"))))
- .withTestServices(fakeBqServices)
- .withoutValidation());
+ WriteResult wr =
+ p.apply(Create.of(row1, row2, row3))
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to(TABLE_SPEC)
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("name").setType("STRING"),
+ new
TableFieldSchema().setName("number").setType("INTEGER"))))
+ .withTestServices(fakeBqServices)
+ .withoutValidation());
+
+ PAssert.that(wr.getFailedInserts()).empty();
+
p.run();
assertThat(
- fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+ fakeDatasetService.getAllRows(PROJECT_ID, DATASET_ID, TABLE_ID),
containsInAnyOrder(row1, row2, row3));
}
@@ -459,7 +537,7 @@ public void testRetryPolicy() throws Exception {
p.apply(Create.of(row1, row2, row3))
.apply(
BigQueryIO.writeTableRows()
- .to("project-id:dataset-id.table-id")
+ .to(TABLE_SPEC)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withSchema(
@@ -479,36 +557,41 @@ public void testRetryPolicy() throws Exception {
// Only row1 and row3 were successfully inserted.
assertThat(
- fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+ fakeDatasetService.getAllRows(PROJECT_ID, "dataset-id", "table-id"),
containsInAnyOrder(row1, row3));
}
@Test
public void testWrite() throws Exception {
- p.apply(
- Create.of(
- new TableRow().set("name", "a").set("number", 1),
- new TableRow().set("name", "b").set("number", 2),
- new TableRow().set("name", "c").set("number", 3))
- .withCoder(TableRowJsonCoder.of()))
- .apply(
- BigQueryIO.writeTableRows()
- .to("dataset-id.table-id")
-
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
- .withSchema(
- new TableSchema()
- .setFields(
- ImmutableList.of(
- new
TableFieldSchema().setName("name").setType("STRING"),
- new
TableFieldSchema().setName("number").setType("INTEGER"))))
- .withTestServices(fakeBqServices)
- .withoutValidation());
+ WriteResult wr =
+ p.apply(
+ Create.of(
+ new TableRow().set("name", "a").set("number", 1),
+ new TableRow().set("name", "b").set("number", 2),
+ new TableRow().set("name", "c").set("number", 3))
+ .withCoder(TableRowJsonCoder.of()))
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to(TABLE_SPEC)
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("name").setType("STRING"),
+ new
TableFieldSchema().setName("number").setType("INTEGER"))))
+ .withTestServices(fakeBqServices)
+ .withoutValidation());
+
+ PAssert.that(wr.getLoadJobResults())
+
.containsInAnyOrder(buildKVWithResult(BigQueryHelpers.Status.SUCCEEDED,
TABLE_SPEC));
+
p.run();
}
@Test
public void testStreamingWrite() throws Exception {
- p.apply(
+ PCollection<TableRow> failures = p.apply(
Create.of(
new TableRow().set("name", "a").set("number", 1),
new TableRow().set("name", "b").set("number", 2),
@@ -518,7 +601,7 @@ public void testStreamingWrite() throws Exception {
.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
.apply(
BigQueryIO.writeTableRows()
- .to("project-id:dataset-id.table-id")
+ .to(TABLE_SPEC)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withSchema(
new TableSchema()
@@ -527,11 +610,15 @@ public void testStreamingWrite() throws Exception {
new
TableFieldSchema().setName("name").setType("STRING"),
new
TableFieldSchema().setName("number").setType("INTEGER"))))
.withTestServices(fakeBqServices)
- .withoutValidation());
+ .withoutValidation())
+ .getFailedInserts();
+
+ PAssert.that(failures).empty();
+
p.run();
assertThat(
- fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+ fakeDatasetService.getAllRows(PROJECT_ID, DATASET_ID, TABLE_ID),
containsInAnyOrder(
new TableRow().set("name", "a").set("number", 1),
new TableRow().set("name", "b").set("number", 2),
@@ -673,7 +760,7 @@ public void testWriteWithDynamicTables(boolean streaming)
throws Exception {
Map<String, String> schemas = Maps.newHashMap();
for (int i = 0; i < 5; i++) {
TableDestination destination =
- new TableDestination("project-id:dataset-id" + ".table-id-" + i, "");
+ new TableDestination(PROJECT_ID + ":" + DATASET_ID + ".table-id-" +
i, "");
targetTables.put(i, destination);
// Make sure each target table has its own custom table.
schemas.put(
@@ -704,21 +791,43 @@ public void testWriteWithDynamicTables(boolean streaming)
throws Exception {
PCollectionView<Map<String, String>> schemasView =
p.apply("CreateSchemaMap",
Create.of(schemas)).apply("ViewSchemaAsMap", View.asMap());
- input
- .apply(Window.into(windowFn))
- .apply(
- BigQueryIO.<Integer>write()
- .to(tableFunction)
- .withFormatFunction(i -> new TableRow().set("name", "number" +
i).set("number", i))
-
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
- .withSchemaFromView(schemasView)
- .withTestServices(fakeBqServices)
- .withoutValidation());
+ WriteResult wr =
+ input
+ .apply(Window.into(windowFn))
+ .apply(
+ BigQueryIO.<Integer>write()
+ .to(tableFunction)
+ .withFormatFunction(
+ i -> new TableRow().set("name", "number" +
i).set("number", i))
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withSchemaFromView(schemasView)
+ .withTestServices(fakeBqServices)
+ .withoutValidation());
+
+ if (!streaming) {
+ List<KV<TableDestination, BigQueryWriteResult>> destinations =
+ targetTables
+ .entrySet()
+ .stream()
+ .map(
+ entry ->
+ KV.of(
+ entry.getValue(),
+ new BigQueryWriteResult(
+ BigQueryHelpers.Status.SUCCEEDED,
+
BigQueryHelpers.toJsonString(entry.getValue().getTableReference()))))
+ .collect(toList());
+
+ PAssert.that(wr.getLoadJobResults()).containsInAnyOrder(destinations);
+ } else {
+ PAssert.that(wr.getFailedInserts()).empty();
+ }
+
p.run();
for (int i = 0; i < 5; ++i) {
String tableId = String.format("table-id-%d", i);
- String tableSpec = String.format("project-id:dataset-id.%s", tableId);
+ String tableSpec = String.format(PROJECT_ID + ":" + DATASET_ID + ".%s",
tableId);
// Verify that table was created with the correct schema.
assertThat(
@@ -726,64 +835,50 @@ public void testWriteWithDynamicTables(boolean streaming)
throws Exception {
fakeDatasetService
.getTable(
new TableReference()
- .setProjectId("project-id")
- .setDatasetId("dataset-id")
+ .setProjectId(PROJECT_ID)
+ .setDatasetId(DATASET_ID)
.setTableId(tableId))
.getSchema()),
equalTo(schemas.get(tableSpec)));
// Verify that the table has the expected contents.
assertThat(
- fakeDatasetService.getAllRows("project-id", "dataset-id", tableId),
+ fakeDatasetService.getAllRows(PROJECT_ID, DATASET_ID, tableId),
containsInAnyOrder(
new TableRow().set("name", String.format("number%d",
i)).set("number", i),
new TableRow().set("name", String.format("number%d", i +
5)).set("number", i + 5)));
}
}
- @Test
- public void testWriteUnknown() throws Exception {
- p.apply(
- Create.of(
- new TableRow().set("name", "a").set("number", 1),
- new TableRow().set("name", "b").set("number", 2),
- new TableRow().set("name", "c").set("number", 3))
- .withCoder(TableRowJsonCoder.of()))
- .apply(
- BigQueryIO.writeTableRows()
- .to("project-id:dataset-id.table-id")
-
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
- .withTestServices(fakeBqServices)
- .withoutValidation());
-
- thrown.expect(RuntimeException.class);
- thrown.expectMessage("Failed to create load job");
- p.run();
- }
-
@Test
public void testWriteFailedJobs() throws Exception {
- p.apply(
- Create.of(
- new TableRow().set("name", "a").set("number", 1),
- new TableRow().set("name", "b").set("number", 2),
- new TableRow().set("name", "c").set("number", 3))
- .withCoder(TableRowJsonCoder.of()))
- .apply(
- BigQueryIO.writeTableRows()
- .to("dataset-id.table-id")
-
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
- .withTestServices(fakeBqServices)
- .withoutValidation());
-
- thrown.expect(RuntimeException.class);
- thrown.expectMessage("Failed to create load job with id prefix");
- thrown.expectMessage("reached max retries");
- thrown.expectMessage("last failed load job");
+ String table = TABLE_SPEC;
+ WriteResult wr =
+ p.apply(
+ Create.of(
+ new TableRow().set("name", "a").set("number", 1),
+ new TableRow().set("name", "b").set("number", 2),
+ new TableRow().set("name", "c").set("number", 3))
+ .withCoder(TableRowJsonCoder.of()))
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to(table)
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
+ .withTestServices(fakeBqServices)
+ .withoutValidation());
+ PAssert.that(wr.getLoadJobResults())
+ .containsInAnyOrder(
+ KV.of(
+ new TableDestination(table, null),
+ new BigQueryWriteResult(
+ BigQueryHelpers.Status.FAILED,
+
BigQueryHelpers.toJsonString(BigQueryHelpers.parseTableSpec(table)))));
p.run();
}
+ // TODO add a test for UNKNOWN errors
+
@Test
public void testWriteWithMissingSchemaFromView() throws Exception {
PCollectionView<Map<String, String>> view =
@@ -792,13 +887,13 @@ public void testWriteWithMissingSchemaFromView() throws
Exception {
p.apply(Create.empty(TableRowJsonCoder.of()))
.apply(
BigQueryIO.writeTableRows()
- .to("dataset-id.table-id")
+ .to(TABLE_SPEC)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withSchemaFromView(view)
.withTestServices(fakeBqServices)
.withoutValidation());
- thrown.expectMessage("does not contain data for table destination
dataset-id.table-id");
+ thrown.expectMessage("does not contain data for table destination " +
TABLE_SPEC);
p.run();
}
@@ -1040,7 +1135,7 @@ private void testWritePartition(
} else {
for (int i = 0; i < numTables; ++i) {
for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) {
- String tableName = String.format("project-id:dataset-id.tables%05d",
i);
+ String tableName = String.format(PROJECT_ID + ":" + DATASET_ID +
".tables%05d", i);
expectedPartitions.add(ShardedKey.of(new TableDestination(tableName,
""), j));
}
}
@@ -1049,7 +1144,7 @@ private void testWritePartition(
List<WriteBundlesToFiles.Result<TableDestination>> files =
Lists.newArrayList();
Map<String, List<String>> filenamesPerTable = Maps.newHashMap();
for (int i = 0; i < numTables; ++i) {
- String tableName = String.format("project-id:dataset-id.tables%05d", i);
+ String tableName = String.format(PROJECT_ID + ":" + DATASET_ID +
".tables%05d", i);
List<String> filenames =
filenamesPerTable.computeIfAbsent(tableName, k ->
Lists.newArrayList());
for (int j = 0; j < numFilesPerTable; ++j) {
@@ -1148,7 +1243,7 @@ public void testWriteTables() throws Exception {
List<KV<ShardedKey<String>, List<String>>> partitions =
Lists.newArrayList();
for (int i = 0; i < numTables; ++i) {
- String tableName = String.format("project-id:dataset-id.table%05d", i);
+ String tableName = String.format(PROJECT_ID + ":" + DATASET_ID +
".table%05d", i);
TableDestination tableDestination = new TableDestination(tableName,
tableName);
for (int j = 0; j < numPartitions; ++j) {
String tempTableId = BigQueryHelpers.createJobId(jobIdToken,
tableDestination, j, 0);
@@ -1170,8 +1265,8 @@ public void testWriteTables() throws Exception {
String json =
String.format(
-
"{\"datasetId\":\"dataset-id\",\"projectId\":\"project-id\",\"tableId\":\"%s\"}",
- tempTableId);
+
"{\"datasetId\":\"%s\",\"projectId\":\"%s\",\"tableId\":\"%s\"}",
+ DATASET_ID, PROJECT_ID, tempTableId);
expectedTempTables.put(tableDestination, json);
}
}
@@ -1196,9 +1291,34 @@ public void testWriteTables() throws Exception {
4,
false);
- PCollection<KV<TableDestination, String>> writeTablesOutput =
+ PCollection<KV<TableDestination, BigQueryWriteResult>> writeTablesResults =
writeTablesInput.apply(writeTables);
+ List<KV<TableDestination, BigQueryWriteResult>> expectedWriteResults =
+ expectedTempTables
+ .entries()
+ .stream()
+ .map(
+ entry ->
+ KV.of(
+ entry.getKey(),
+ new BigQueryWriteResult(
+ BigQueryHelpers.Status.SUCCEEDED,
entry.getValue())))
+ .collect(Collectors.toList());
+
+ PAssert.that(writeTablesResults).containsInAnyOrder(expectedWriteResults);
+
+ PCollection<KV<TableDestination, String>> writeTablesOutput =
+ writeTablesResults.apply(
+ ParDo.of(
+ new DoFn<
+ KV<TableDestination, BigQueryWriteResult>,
KV<TableDestination, String>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(c.element().getKey(),
c.element().getValue().getTable()));
+ }
+ }));
+
PAssert.thatMultimap(writeTablesOutput)
.satisfies(
input -> {
@@ -1245,13 +1365,13 @@ public void testWriteRename() throws Exception {
Multimap<TableDestination, String> tempTables = ArrayListMultimap.create();
List<KV<TableDestination, String>> tempTablesElement =
Lists.newArrayList();
for (int i = 0; i < numFinalTables; ++i) {
- String tableName = "project-id:dataset-id.table_" + i;
+ String tableName = PROJECT_ID + ":" + DATASET_ID + ".table_" + i;
TableDestination tableDestination = new TableDestination(tableName,
"table_" + i + "_desc");
for (int j = 0; i < numTempTablesPerFinalTable; ++i) {
TableReference tempTable =
new TableReference()
- .setProjectId("project-id")
- .setDatasetId("dataset-id")
+ .setProjectId(PROJECT_ID)
+ .setDatasetId(DATASET_ID)
.setTableId(String.format("%s_%05d_%05d", jobIdToken, i, j));
fakeDatasetService.createTable(new
Table().setTableReference(tempTable));
@@ -1308,24 +1428,23 @@ public void testWriteRename() throws Exception {
@Test
public void testRemoveTemporaryTables() throws Exception {
FakeDatasetService datasetService = new FakeDatasetService();
- String projectId = "project";
- String datasetId = "dataset";
- datasetService.createDataset(projectId, datasetId, "", "", null);
+ datasetService.createDataset(PROJECT_ID, DATASET_ID, "", "", null);
List<TableReference> tableRefs =
Lists.newArrayList(
BigQueryHelpers.parseTableSpec(
- String.format("%s:%s.%s", projectId, datasetId, "table1")),
+ String.format("%s:%s.%s", PROJECT_ID, DATASET_ID, "table1")),
BigQueryHelpers.parseTableSpec(
- String.format("%s:%s.%s", projectId, datasetId, "table2")),
+ String.format("%s:%s.%s", PROJECT_ID, DATASET_ID, "table2")),
BigQueryHelpers.parseTableSpec(
- String.format("%s:%s.%s", projectId, datasetId, "table3")));
+ String.format("%s:%s.%s", PROJECT_ID, DATASET_ID, "table3")));
for (TableReference tableRef : tableRefs) {
datasetService.createTable(new Table().setTableReference(tableRef));
}
// Add one more table to delete that does not actually exist.
tableRefs.add(
- BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId,
datasetId, "table4")));
+ BigQueryHelpers.parseTableSpec(
+ String.format("%s:%s.%s", PROJECT_ID, DATASET_ID, "table4")));
WriteRename.removeTemporaryTables(datasetService, tableRefs);
@@ -1367,7 +1486,7 @@ public void testWriteToTableDecorator() throws Exception {
p.apply(Create.of(row1, row2))
.apply(
BigQueryIO.writeTableRows()
- .to("project-id:dataset-id.table-id$20171127")
+ .to(PROJECT_ID + ":" + DATASET_ID + ".table-id$20171127")
.withTestServices(fakeBqServices)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withSchema(schema)
@@ -1380,7 +1499,7 @@ public void testExtendedErrorRetrieval() throws Exception
{
TableRow row1 = new TableRow().set("name", "a").set("number", "1");
TableRow row2 = new TableRow().set("name", "b").set("number", "2");
TableRow row3 = new TableRow().set("name", "c").set("number", "3");
- String tableSpec = "project-id:dataset-id.table-id";
+ String tableSpec = TABLE_SPEC;
TableDataInsertAllResponse.InsertErrors ephemeralError =
new TableDataInsertAllResponse.InsertErrors()
@@ -1423,7 +1542,7 @@ public void testExtendedErrorRetrieval() throws Exception
{
// Only row1 and row3 were successfully inserted.
assertThat(
- fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+ fakeDatasetService.getAllRows(PROJECT_ID, DATASET_ID, TABLE_ID),
containsInAnyOrder(row1, row3));
}
@@ -1434,7 +1553,7 @@ public void testWrongErrorConfigs() {
BigQueryIO.Write<TableRow> bqIoWrite =
BigQueryIO.writeTableRows()
- .to("project-id:dataset-id.table-id")
+ .to(TABLE_SPEC)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withSchema(
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryWriteResultCoderTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryWriteResultCoderTest.java
new file mode 100644
index 000000000000..f1080c364d73
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryWriteResultCoderTest.java
@@ -0,0 +1,22 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test case for {@link BigQueryWriteResultCoder}. */
+@RunWith(JUnit4.class)
+public class BigQueryWriteResultCoderTest {
+
+ private static final Coder<BigQueryWriteResult> TEST_CODER =
BigQueryWriteResultCoder.of();
+
+ @Test
+ public void testDecodeEncodeEqual() throws Exception {
+ BigQueryWriteResult value =
+ new BigQueryWriteResult(BigQueryHelpers.Status.SUCCEEDED,
"dummy_table");
+
+ CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 175843)
Time Spent: 2h 20m (was: 2h 10m)
> Get BigQueryIO batch loads to return something actionable
> ---------------------------------------------------------
>
> Key: BEAM-4824
> URL: https://issues.apache.org/jira/browse/BEAM-4824
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Reporter: Carlos Alonso
> Assignee: Carlos Alonso
> Priority: Minor
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> ATM BigQueryIO batchloads returns an empty collection that has no information
> related to how the load job finished. It is even returned before the job
> finishes.
>
> Change it so that:
> # The returning PCollection only appers when the job has actually finished
> # The returning PCollection contains information about the job result
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)