[
https://issues.apache.org/jira/browse/BEAM-2870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271657#comment-16271657
]
ASF GitHub Bot commented on BEAM-2870:
--------------------------------------
asfgit closed pull request #4177: [BEAM-2870] Strips partition decorators when
creating/patching tables in batch
URL: https://github.com/apache/beam/pull/4177
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 9ca253ca8df..7077651ced2 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -264,7 +264,9 @@ private void load(
switch (jobStatus) {
case SUCCEEDED:
if (tableDescription != null) {
- datasetService.patchTableDescription(ref, tableDescription);
+ datasetService.patchTableDescription(
+
ref.clone().setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
+ tableDescription);
}
return;
case UNKNOWN:
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 08f74649859..31c1781f610 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -589,7 +589,12 @@ public String apply(String arg) {
if (streaming) {
users = users.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+
}
+
+ // Use a partition decorator to verify that partition decorators are
supported.
+ final String partitionDecorator = "20171127";
+
users.apply(
"WriteBigQuery",
BigQueryIO.<String>write()
@@ -630,7 +635,8 @@ public TableDestination getTable(Integer userId) {
verifySideInputs();
// Each user in it's own table.
return new TableDestination(
- "dataset-id.userid-" + userId, "table for userid " +
userId);
+ "dataset-id.userid-" + userId + "$" +
partitionDecorator,
+ "table for userid " + userId);
}
@Override
@@ -2528,7 +2534,7 @@ public void testWriteToTableDecorator() throws Exception {
p.apply(Create.of(row1, row2))
.apply(
BigQueryIO.writeTableRows()
- .to("project-id:dataset-id.table-id$decorator")
+ .to("project-id:dataset-id.table-id$20171127")
.withTestServices(fakeBqServices)
.withMethod(Method.STREAMING_INSERTS)
.withSchema(schema)
@@ -2539,7 +2545,7 @@ public void testWriteToTableDecorator() throws Exception {
@Test
public void testTableDecoratorStripping() {
assertEquals("project:dataset.table",
-
BigQueryHelpers.stripPartitionDecorator("project:dataset.table$decorator"));
+
BigQueryHelpers.stripPartitionDecorator("project:dataset.table$20171127"));
assertEquals("project:dataset.table",
BigQueryHelpers.stripPartitionDecorator("project:dataset.table"));
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index ffab1239217..79e03e84a58 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -96,6 +96,7 @@ private TableContainer getTableContainer(String projectId,
String datasetId, Str
@Override
public void deleteTable(TableReference tableRef) throws IOException,
InterruptedException {
+ validateTableReference(tableRef);
synchronized (BigQueryIOTest.tables) {
Map<String, TableContainer> dataset =
BigQueryIOTest.tables.get(tableRef.getProjectId(),
tableRef.getDatasetId());
@@ -109,12 +110,8 @@ public void deleteTable(TableReference tableRef) throws
IOException, Interrupted
}
}
-
- @Override
- public void createTable(Table table) throws IOException {
+ private static void validateTableReference(TableReference tableReference)
throws IOException {
final Pattern tableRegexp = Pattern.compile("[-\\w]{1,1024}");
-
- TableReference tableReference = table.getTableReference();
if (!tableRegexp.matcher(tableReference.getTableId()).matches()) {
throw new IOException(
String.format(
@@ -123,6 +120,12 @@ public void createTable(Table table) throws IOException {
+ " decorators cannot be used.",
tableReference.getTableId()));
}
+ }
+
+ @Override
+ public void createTable(Table table) throws IOException {
+ TableReference tableReference = table.getTableReference();
+ validateTableReference(tableReference);
synchronized (BigQueryIOTest.tables) {
Map<String, TableContainer> dataset =
BigQueryIOTest.tables.get(tableReference.getProjectId(),
tableReference.getDatasetId());
@@ -245,6 +248,7 @@ public long insertAll(
public Table patchTableDescription(TableReference tableReference,
@Nullable String tableDescription)
throws IOException, InterruptedException {
+ validateTableReference(tableReference);
synchronized (BigQueryIOTest.tables) {
TableContainer tableContainer =
getTableContainer(tableReference.getProjectId(),
tableReference.getDatasetId(), tableReference.getTableId());
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index edf2a55142a..fcf464fbdfb 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -312,7 +312,14 @@ private JobStatus runLoadJob(JobReference jobRef,
JobConfigurationLoad load)
return new JobStatus().setState("FAILED").setErrorResult(new
ErrorProto());
}
if (existingTable == null) {
- existingTable = new
Table().setTableReference(destination).setSchema(schema);
+ TableReference strippedDestination =
+ destination
+ .clone()
+
.setTableId(BigQueryHelpers.stripPartitionDecorator(destination.getTableId()));
+ existingTable =
+ new Table()
+ .setTableReference(strippedDestination)
+ .setSchema(schema);
if (load.getTimePartitioning() != null) {
existingTable =
existingTable.setTimePartitioning(load.getTimePartitioning());
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> BQ Partitioned Table Write Fails When Destination has Partition Decorator
> -------------------------------------------------------------------------
>
> Key: BEAM-2870
> URL: https://issues.apache.org/jira/browse/BEAM-2870
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Affects Versions: 2.2.0
> Environment: Dataflow Runner, Streaming, 10 x (n1-highmem-8 & 500gb
> SDD)
> Reporter: Steven Jon Anderson
> Assignee: Reuven Lax
> Labels: bigquery, dataflow, google, google-cloud-bigquery,
> google-dataflow
> Fix For: 2.2.0, 2.3.0
>
>
> Dataflow Job ID:
> https://console.cloud.google.com/dataflow/job/2017-09-08_23_03_14-14637186041605198816
> Tagging [~reuvenlax] as I believe he built the time partitioning integration
> that was merged into master.
> *Background*
> Our production pipeline ingests millions of events per day and routes events
> into our clients' numerous tables. To keep costs down, all of our tables are
> partitioned. However, this requires that we create the tables before we allow
> events to process as creating partitioned tables isn't supported in 2.1.0.
> We've been looking forward to [~reuvenlax]'s partition table write feature
> ([#3663|https://github.com/apache/beam/pull/3663]) to get merged into master
> for some time now as it'll allow us to launch our client platforms much, much
> faster. Today we got around to testing the 2.2.0 nightly and discovered this
> bug.
> *Issue*
> Our pipeline writes to a table with a decorator. When attempting to write to
> an existing partitioned table with a decorator, the write succeeds. When
> using a partitioned table destination that doesn't exist without a decorator,
> the write succeeds. *However, when writing to a partitioned table that
> doesn't exist with a decorator, the write fails*.
> *Example Implementation*
> {code:java}
> BigQueryIO.writeTableRows()
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
> .withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())
> .to(new DynamicDestinations<TableRow, String>() {
> @Override
> public String getDestination(ValueInSingleWindow<TableRow> element) {
> return "PROJECT_ID:DATASET_ID.TABLE_ID$20170902";
> }
> @Override
> public TableDestination getTable(String destination) {
> TimePartitioning DAY_PARTITION = new TimePartitioning().setType("DAY");
> return new TableDestination(destination, null, DAY_PARTITION);
> }
> @Override
> public TableSchema getSchema(String destination) {
> return TABLE_SCHEMA;
> }
> })
> {code}
> *Relevant Logs & Errors in StackDriver*
> {code:none}
> 23:06:26.790
> Trying to create BigQuery table: PROJECT_ID:DATASET_ID.TABLE_ID$20170902
> 23:06:26.873
> Invalid table ID \"TABLE_ID$20170902\". Table IDs must be alphanumeric (plus
> underscores) and must be at most 1024 characters long. Also, Table decorators
> cannot be used.
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)