Abacn opened a new issue, #26796:
URL: https://github.com/apache/beam/issues/26796
### What happened?
I am trying to reproduce #26789 but found another apparent bug before set
all conditions:
Attached is the full code. Basically what it does is to write to a table
with schema order shuffled; tableRow insertion order shuffled; changing number
of fields and see if data corruption/loss/exception happens.
However, it is found that if the TableRow contains a Timestamp field valued
by a String (set initcolumns = 5 below), there is no record get written into
BigQuery, and no error shown.
(Set initcolumns = 4 which avoided the Timestamp field, all record written
successfully into BigQuery)
```java
package com.github.abacn;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.SerializableFunction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Reproduce the issue of <a
href="https://github.com/apache/beam/issues/26789">BigQueryIO Storage API write
data corruption</a>
*/
public class BigQueryStorageWriteDemo {
public static void main(String[] argv) {
PipelineOptions option = PipelineOptionsFactory.fromArgs(argv).create();
Pipeline pipeline = Pipeline.create(option);
final int initcolumns = 5;
TableSchema initSchema = getTableSchema(initcolumns, true);
BigQueryIO.Write<Long> writeIO = BigQueryIO.<Long>write()
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
.withFormatFunction(new FormatFn(initcolumns, false))
.to("*******") // table specifier
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withSchema(initSchema);
//.withAutoSchemaUpdate(true)
//.ignoreUnknownValues();
pipeline.apply(GenerateSequence.from(0).to(10)).apply(writeIO);
pipeline.run().waitUntilFinish();
System.out.println("Finished");
System.exit(0);
}
static TableSchema getTableSchema(int numColumns, boolean isShuffle) {
List<TableFieldSchema> fields = new ArrayList<>(numColumns);
for (int idx = 0; idx < numColumns; ++idx) {
switch (idx) {
case 0:
fields.add(new
TableFieldSchema().setName("int_value").setType("INTEGER"));
break;
case 1:
fields.add(new
TableFieldSchema().setName("double_value").setType("FLOAT"));
break;
case 2:
fields.add(new
TableFieldSchema().setName("string_value").setType("STRING"));
break;
case 3:
fields.add(new
TableFieldSchema().setName("boolean_value").setType("BOOLEAN"));
break;
case 4:
fields.add(new
TableFieldSchema().setName("time_stamp_value").setType("TIMESTAMP"));
break;
default:
fields.add(new TableFieldSchema().setName("field_" +
idx).setType("INTEGER"));
}
}
if (isShuffle) {
Collections.shuffle(fields);
}
return new TableSchema().setFields(fields);
}
private static class FormatFn implements SerializableFunction<Long,
TableRow> {
protected final int numColumns;
protected final boolean isShuffle;
public FormatFn(int numColumns, boolean isShuffle) {
this.numColumns = numColumns;
this.isShuffle = isShuffle;
}
@Override
public TableRow apply(Long input) {
List<Integer> arrs = new ArrayList<>(numColumns);
for (int idx = 0; idx < numColumns; idx++) {
arrs.add(idx);
}
if (isShuffle) {
Collections.shuffle(arrs);
}
TableRow row = new TableRow();
for (int idx : arrs) {
switch (idx) {
case 0:
row.set("int_value", input);
break;
case 1:
row.set("double_value", input.doubleValue() + 0.1);
break;
case 2:
row.set("string_value", input.toString() + "S");
break;
case 3:
row.set(("boolean_value"), (input % 2 == 0));
break;
case 4:
row.set("time_stamp_value", String.format("%d-01-03
12:34:56+00", 2000+input));
break;
default:
row.set("field_" + idx, input);
}
}
return row;
}
}
}
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]