[
https://issues.apache.org/jira/browse/BEAM-2768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16145351#comment-16145351
]
Matti Remes commented on BEAM-2768:
-----------------------------------
Sorry for a slow response. The only error I can see is the API error in the
issue description. The error is still about the same jobId which means only the
first insert gets through.
The dynamic destinations class looks like this, nothing fancy:
{code:java}
public class EventDestination extends DynamicDestinations<TableRow, String> {
private static final DateTimeFormatter format =
DateTimeFormat.forPattern("YYYYMMdd");
private static final Logger LOG =
LoggerFactory.getLogger(AnalyticsFlow.class);
private final String project;
private final String dataset;
private final String table;
private final Schema schema;
public EventDestination(String project, String dataset, String table,
Schema schema) {
this.project = project;
this.dataset = dataset;
this.table = table;
this.schema = schema;
}
private String getTableSuffix(String timestamp) {
DateTime date = DateTime.parse(timestamp);
return date.toString(format);
}
@Override
public String getDestination(ValueInSingleWindow<TableRow>
valueInSingleWindow) {
return valueInSingleWindow.getValue().get("timestamp").toString();
}
@Override
public TableDestination getTable(String s) {
String tableFormat = "%s:%s.%s_%s";
String suffix = getTableSuffix(s);
String tableName = String.format(tableFormat, project, dataset, table,
suffix);
return new TableDestination(tableName, null);
}
@Override
public TableSchema getSchema(String s) {
try {
return this.schema.getSchema();
} catch (IOException e) {
LOG.error("Invalid table schema passed");
return null;
}
}
}
{code}
> Fix bigquery.WriteTables generating non-unique job identifiers
> --------------------------------------------------------------
>
> Key: BEAM-2768
> URL: https://issues.apache.org/jira/browse/BEAM-2768
> Project: Beam
> Issue Type: Bug
> Components: beam-model
> Affects Versions: 2.0.0
> Reporter: Matti Remes
> Assignee: Reuven Lax
>
> This is a result of BigQueryIO not creating unique job ids for batch inserts,
> thus BigQuery API responding with a 409 conflict error:
> {code:java}
> Request failed with code 409, will NOT retry:
> https://www.googleapis.com/bigquery/v2/projects/<project_id>/jobs
> {code}
> The jobs are initiated in a step BatchLoads/SinglePartitionWriteTables,
> called by step's WriteTables ParDo:
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L511-L521
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L148
> It would probably be a good idea to append a UUIDs as part of a job id.
> Edit: This is a major bug blocking using BigQuery as a sink for bounded input.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)