Kishan Kumar created BEAM-7425:
----------------------------------

             Summary: Reading BigQuery Table Data in for of Java Classes(Pojo)
                 Key: BEAM-7425
                 URL: https://issues.apache.org/jira/browse/BEAM-7425
             Project: Beam
          Issue Type: New Feature
          Components: beam-model
    Affects Versions: 2.12.0
         Environment: Dataflow
            Reporter: Kishan Kumar
             Fix For: 2.13.0


While Developing my code I used the below code snippet to read the table data 
from BigQuery.

{{PCollection<ReasonCode> gpseEftReasonCodes = input. apply("Reading xxyyzz", 
BigQueryIO.read(new ReadTable<ReasonCode>(ReasonCode.class)) 
.withoutValidation().withTemplateCompatibility() .fromQuery("Select * from 
dataset.xxyyzz").usingStandardSql() 
.withCoder(SerializableCoder.of(xxyyzz.class)) }}

*Read Table Class:*

{{@DefaultSchema(JavaBeanSchema.class) }}

{{public class ReadTable<T> implements SerializableFunction<SchemaAndRecord, T> 
{}}

{{ private static final long serialVersionUID = 1L; }}

{{private static Gson gson = new Gson(); }}

{{public static final Logger LOG = LoggerFactory.getLogger(ReadTable.class); 
private final Counter countingRecords = 
Metrics.counter(ReadTable.class,"Reading Records EFT Report"); }}

{{private Class<T> class1; public ReadTable(Class<T> class1) {}}

{{ this.class1 = class1; }}

{{} }}

{{public T apply(SchemaAndRecord schemaAndRecord) {}}

{{ Map<String, String> mapping = new HashMap<>(); }}

{{int counter = 0; }}

{{try { }}

{{GenericRecord s = schemaAndRecord.getRecord(); }}

{{org.apache.avro.Schema s1 = s.getSchema(); }}

{{for (Field f : s1.getFields()) { }}

{{counter++; mapping.put(f.name(), 
null==s.get(f.name())?null:String.valueOf(s.get(counter))); }}

{{}}}

{{ countingRecords.inc(); }}

{{JsonElement jsonElement = gson.toJsonTree(mapping); }}

{{return gson.fromJson(jsonElement, class1); }}

{{}catch(Exception mp) { }}

{{LOG.error("Found Wrong Mapping for the Record: "+mapping); 
mp.printStackTrace(); return null; } } } }}

So After Reading the data from Bigquery I was mapping data from SchemaAndRecord 
to pojo I was getting value for columns whose Data type is Numeric mention 
below.

 *{{last_update_amount=java.nio.HeapByteBuffer[pos=0 lim=16 cap=16]}}* 

My Expectation was I will get exact value but getting the HyperByte Buffer the 
version I am using is Apache beam 2.12.0. If any more information is needed 
then please let me know.

Way 2 Tried:

{{GenericRecord s = schemaAndRecord.getRecord(); }}

{{org.apache.avro.Schema s1 = s.getSchema(); }}

{{for (Field f : s1.getFields()) { }}

{{counter++; mapping.put(f.name(), 
null==s.get(f.name())?null:String.valueOf(s.get(counter))); 
if(f.name().equalsIgnoreCase("reason_code_id")) { }}

{{BigDecimal numericValue = new Conversions.DecimalConversion() 
.fromBytes((ByteBuffer)s.get(f.name()) , }}

{{Schema.create(s1.getType()), s1.getLogicalType()); 
System.out.println("Numeric Con"+numericValue); } else \{ 
System.out.println("Else Condition "+f.name()); } } ``` }}

{{Facing Issue: }}

{{2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a: 
RECORD}}

 

{{It would be Great if we have a method which map all the BigQuery Data with 
Pojo Schema which Means if i have 10 Columns in BQ and in my Pojo i need only 5 
Column then in that case BigQueryIO should map only that 5 Data values and Rest 
will be Rejected As i am Doing After So much Effort. Numeric Data Type must be 
Deserialize by it self while fetching data like TableRow.}}

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to