[
https://issues.apache.org/jira/browse/BEAM-2370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16709198#comment-16709198
]
Masud Hasan commented on BEAM-2370:
-----------------------------------
Hello I am not sure if this issue is relate but I am using 2.7 and get
following error message when tried with partition table and dynamic
destination. Please see below:
{code:java}
org.apache.beam.sdk.util.RetryHttpRequestInitializer$LoggingHttpBackOffHandler
handleResponse
WARNING: Request failed with code 400, performed 0 retries due to IOExceptions,
performed 0 retries due to unsuccessful status codes, HTTP framework says
request can be retried, (caller responsible for retrying):
https://www.googleapis.com/bigquery/v2/projects/bq-sql-load-test/datasets/telco_analytics/tables/clickstream_raw$20181204/insertAllException
in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.RuntimeException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
Request
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "Cannot read partition information from a table that is not
partitioned: bq-sql-load-test:telco_analytics.clickstream_raw$20181204",
"reason" : "invalid"
} ],
"message" : "Cannot read partition information from a table that is not
partitioned: bq-sql-load-test:telco_analytics.clickstream_raw$20181204",
"status" : "INVALID_ARGUMENT"
}
{code}
{code:java}
outputData.apply("Format BQ Output Data", ParDo.of(new FormatBQOutputData()))
.apply(BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow,
String>() {
@Override
public String getDestination(ValueInSingleWindow<TableRow> element) {
List<KV<String, Map<String, String>>> inputs = sideInput(schemasView);
String tableSpec = inputs.get(0).getKey();
TableRow row = element.getValue();
String partitionDate = row.get("date_time").toString().replace("-",
"").substring(0, 8);
String destination = String.format("%s$%s", tableSpec, partitionDate);
LOG.info("Destination: " + destination);
return destination;
}
@Override
public List<PCollectionView<?>> getSideInputs() {
return ImmutableList.of(schemasView);
}
@Override
public TableDestination getTable(String destination) {
return new TableDestination(destination, null);
}
@Override
public TableSchema getSchema(String destination) {
List<KV<String, Map<String, String>>> inputs = sideInput(schemasView);
Map<String, String> schemas = inputs.get(0).getValue();
...
return bqSchema;
}
}).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
{code}
> BigQuery Insert with Partition Decorator throwing error
> -------------------------------------------------------
>
> Key: BEAM-2370
> URL: https://issues.apache.org/jira/browse/BEAM-2370
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.0.0
> Environment: DirectRunner
> Reporter: Andre
> Assignee: Reuven Lax
> Priority: Major
> Fix For: 2.2.0
>
>
> Running a DataFlow job with the DirectRunner which is inserting data into a
> partitioned table using decorators throws the following error multiple times
> BUT still inserts records into the right partition.
> {code:java|title=Error}
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl executeWithRetries
> INFO: Ignore the error and retry the request.
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad
> Request
> {
> "code" : 400,
> "errors" : [ {
> "domain" : "global",
> "message" : "Invalid table ID \"mytable_orders$20170516\". Table IDs must
> be alphanumeric (plus underscores) and must be at most 1024 characters long.
> Also, Table decorators cannot be used.",
> "reason" : "invalid"
> } ],
> "message" : "Invalid table ID \"mytable_orders$20170516\". Table IDs must
> be alphanumeric (plus underscores) and must be at most 1024 characters long.
> Also, Table decorators cannot be used."
> }
> at
> com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
> {code}
> {code:java|title=Code}
> // Write TableRows to BQ
> rows.apply("TransformationStep", ParDo.of(new Outputter()))
> .apply("WindowDaily", Window. <TableRow> into(CalendarWindows.days(1)))
> .apply("WriteToBQ", BigQueryIO.writeTableRows()
> .to(new SerializableFunction <ValueInSingleWindow<TableRow> ,
> TableDestination> () {
> private static final long serialVersionUID = 8196602721734820219 L;
> @Override
> public TableDestination apply(ValueInSingleWindow <TableRow> value) {
> String dayString =
> DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC)
> .print(((IntervalWindow) value.getWindow()).start());
> TableDestination td = new
> TableDestination("my-project:dataset.mytable_orders$" + dayString, "");
> return td;
> }
> }).withSchema(mySchema)
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)