[
https://issues.apache.org/jira/browse/BEAM-5191?focusedWorklogId=267724&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-267724
]
ASF GitHub Bot logged work on BEAM-5191:
----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Jun/19 15:20
Start Date: 26/Jun/19 15:20
Worklog Time Spent: 10m
Work Description: jklukas commented on pull request #8945: [BEAM-5191]
Support for BigQuery clustering
URL: https://github.com/apache/beam/pull/8945#discussion_r297726367
##########
File path:
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
##########
@@ -391,6 +392,93 @@ public void testTimePartitioning(BigQueryIO.Write.Method
insertMethod) throws Ex
assertEquals(timePartitioning, table.getTimePartitioning());
}
+ @Test
+ public void testClusteringStreamingInserts() throws Exception {
+ testClustering(BigQueryIO.Write.Method.STREAMING_INSERTS);
+ }
+
+ @Test
+ public void testClusteringBatchLoads() throws Exception {
+ testClustering(BigQueryIO.Write.Method.FILE_LOADS);
+ }
+
+ public void testClustering(BigQueryIO.Write.Method insertMethod) throws
Exception {
+ TableRow row1 = new TableRow().set("date", "2018-01-01").set("number",
"1");
+ TableRow row2 = new TableRow().set("date", "2018-01-02").set("number",
"2");
+
+ TimePartitioning timePartitioning = new
TimePartitioning().setType("DAY").setField("date");
+ Clustering clustering = new
Clustering().setFields(ImmutableList.of("date"));
+ TableSchema schema =
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema()
+ .setName("date")
+ .setType("DATE")
+ .setName("number")
+ .setType("INTEGER")));
+ p.apply(Create.of(row1, row2))
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to("project-id:dataset-id.table-id")
+ .withTestServices(fakeBqServices)
+ .withMethod(insertMethod)
+ .withSchema(schema)
+ .withTimePartitioning(timePartitioning)
+ .withClustering(clustering)
+ .withoutValidation());
+ p.run();
+ Table table =
+ fakeDatasetService.getTable(
+ BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id"));
+ assertEquals(schema, table.getSchema());
+ assertEquals(timePartitioning, table.getTimePartitioning());
+ assertEquals(clustering, table.getClustering());
+ }
+
+ @Test
+ public void testClusteringTableFunction() throws Exception {
+ TableRow row1 = new TableRow().set("date", "2018-01-01").set("number",
"1");
+ TableRow row2 = new TableRow().set("date", "2018-01-02").set("number",
"2");
+
+ TimePartitioning timePartitioning = new
TimePartitioning().setType("DAY").setField("date");
+ Clustering clustering = new
Clustering().setFields(ImmutableList.of("date"));
+ TableSchema schema =
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema()
+ .setName("date")
+ .setType("DATE")
+ .setName("number")
+ .setType("INTEGER")));
+ p.apply(Create.of(row1, row2))
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to(
+ (ValueInSingleWindow<TableRow> vsw) -> {
+ String tableSpec =
+ "project-id:dataset-id.table-" +
vsw.getValue().get("number");
+ return new TableDestination(
+ tableSpec,
+ null,
+ new
TimePartitioning().setType("DAY").setField("date"),
+ new
Clustering().setFields(ImmutableList.of("date")));
+ })
+ .withTestServices(fakeBqServices)
+ .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
+ .withSchema(schema)
+ .enableClustering()
Review comment:
Notably, this test fails if `enableClustering()` is not called because the
default `TableDestinationCoderV2` is used and clustering information is dropped
before the table is created. This is exactly the behavior we want for backwards
compatibility.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 267724)
Time Spent: 8h 20m (was: 8h 10m)
> Add support for writing to BigQuery clustered tables
> ----------------------------------------------------
>
> Key: BEAM-5191
> URL: https://issues.apache.org/jira/browse/BEAM-5191
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Affects Versions: 2.6.0
> Reporter: Robert Sahlin
> Assignee: Wout Scheepers
> Priority: Minor
> Labels: features, newbie
> Time Spent: 8h 20m
> Remaining Estimate: 0h
>
> Google recently added support for clustered tables in BigQuery. It would be
> useful to set clustering columns the same way as for partitioning. It should
> support multiple fields (4) for clustering.
> For example:
> [BigQueryIO.Write|https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html]<[T|https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html]>
> .withClustering(new Clustering().setField("productId").setType("STRING"))
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)