[ 
https://issues.apache.org/jira/browse/BEAM-7425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Goenka updated BEAM-7425:
-------------------------------
    Fix Version/s: 2.14.0

> Reading BigQuery Table Data into Java Classes(Pojo) Directly
> ------------------------------------------------------------
>
>                 Key: BEAM-7425
>                 URL: https://issues.apache.org/jira/browse/BEAM-7425
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>    Affects Versions: 2.12.0
>         Environment: Dataflow
>            Reporter: Kishan Kumar
>            Priority: Major
>              Labels: newbie, patch
>             Fix For: 2.14.0
>
>
> While Developing my code I used the below code snippet to read the table data 
> from BigQuery.
> PCollection<ReasonCode> gpseEftReasonCodes = input. apply("Reading xxyyzz", 
> BigQueryIO.read(new ReadTable<ReasonCode>(ReasonCode.class)) 
> .withoutValidation().withTemplateCompatibility() .fromQuery("Select * from 
> dataset.xxyyzz").usingStandardSql() 
> .withCoder(SerializableCoder.of(xxyyzz.class)) 
> Read Table Class:
> @DefaultSchema(JavaBeanSchema.class) 
> public class ReadTable<T> implements SerializableFunction<SchemaAndRecord, T> 
> {
>  private static final long serialVersionUID = 1L; 
> private static Gson gson = new Gson(); 
> public static final Logger LOG = LoggerFactory.getLogger(ReadTable.class); 
> private final Counter countingRecords = 
> Metrics.counter(ReadTable.class,"Reading Records EFT Report"); 
> private Class<T> class1; public ReadTable(Class<T> class1) {
>  this.class1 = class1; 
> } 
> public T apply(SchemaAndRecord schemaAndRecord) {
>  Map<String, String> mapping = new HashMap<>(); 
> int counter = 0; 
> try { 
> GenericRecord s = schemaAndRecord.getRecord(); 
> org.apache.avro.Schema s1 = s.getSchema(); 
> for (Field f : s1.getFields()) { 
> counter++; mapping.put(f.name(), 
> null==s.get(f.name())?null:String.valueOf(s.get(counter))); 
> }
>  countingRecords.inc(); 
> JsonElement jsonElement = gson.toJsonTree(mapping); 
> return gson.fromJson(jsonElement, class1); 
> }catch(Exception mp) { 
> LOG.error("Found Wrong Mapping for the Record: "+mapping); 
> mp.printStackTrace(); return null; } } } 
> So After Reading the data from Bigquery I was mapping data from 
> SchemaAndRecord to pojo I was getting value for columns whose Data type is 
> Numeric mention below.
> last_update_amount=java.nio.HeapByteBuffer[pos=0 lim=16 cap=16]
> My Expectation was I will get exact value but getting the HyperByte Buffer 
> the version I am using is Apache beam 2.12.0. If any more information is 
> needed then please let me know.
> Way 2 Tried:
> GenericRecord s = schemaAndRecord.getRecord(); 
> org.apache.avro.Schema s1 = s.getSchema(); 
> for (Field f : s1.getFields()) { 
> counter++; mapping.put(f.name(), 
> null==s.get(f.name())?null:String.valueOf(s.get(counter))); 
> if(f.name().equalsIgnoreCase("reason_code_id")) { 
> BigDecimal numericValue = new Conversions.DecimalConversion() 
> .fromBytes((ByteBuffer)s.get(f.name()) , 
> Schema.create(s1.getType()), s1.getLogicalType()); 
> System.out.println("Numeric Con"+numericValue); } else { 
> System.out.println("Else Condition "+f.name()); } } ``` 
> Facing Issue: 
> 2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a: 
> RECORD
>  
> It would be Great if we have a method which maps all the BigQuery Data with 
> Pojo Schema which Means if I have 10 Columns in BQ and in my Pojo I need only 
> 5 Column then, in that case, BigQueryIO should map only that 5 Data values 
> into Java Class and Rest will be Rejected As I am Doing After So much Effort. 
> Numeric Data Type must be Deserialize by itself while fetching data like 
> TableRow.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to