[ 
https://issues.apache.org/jira/browse/BEAM-7425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Kedin updated BEAM-7425:
------------------------------
    Description: 
While Developing my code I used the below code snippet to read the table data 
from BigQuery.

 
{code:java}
PCollection<ReasonCode> gpseEftReasonCodes = input
      .apply("Reading xxyyzz", 
          BigQueryIO
                  .read(new ReadTable<ReasonCode>(ReasonCode.class))
                  .withoutValidation()
                  .withTemplateCompatibility()
                  .fromQuery("Select * from dataset.xxyyzz")
                  .usingStandardSql()
                  .withCoder(SerializableCoder.of(xxyyzz.class))
{code}

Read Table Class:

{code:java}

@DefaultSchema(JavaBeanSchema.class)
public class ReadTable<T> implements SerializableFunction<SchemaAndRecord, T> {
  private static final long serialVersionUID = 1L;
  private static Gson gson = new Gson();
  public static final Logger LOG = LoggerFactory.getLogger(ReadTable.class); 
private final Counter countingRecords = 
  Metrics.counter(ReadTable.class, "Reading Records EFT Report");
  private Class<T> class1;
  
  public ReadTable(Class<T> class1) { this.class1 = class1; }
 
  public T apply(SchemaAndRecord schemaAndRecord) {
    Map<String, String> mapping = new HashMap<>();
    int counter = 0;
    try {
      GenericRecord s = schemaAndRecord.getRecord();
      org.apache.avro.Schema s1 = s.getSchema();
      for (Field f : s1.getFields()) {
        counter++;
        mapping.put(f.name(), null==s.get(f.name()) ? null : 
String.valueOf(s.get(counter)));
      }
      countingRecords.inc();
      JsonElement jsonElement = gson.toJsonTree(mapping);
      return gson.fromJson(jsonElement, class1);
    } catch (Exception mp) {
      LOG.error("Found Wrong Mapping for the Record: "+mapping); 
mp.printStackTrace(); return null; }
    }
}

{code}

So After Reading the data from Bigquery I was mapping data from SchemaAndRecord 
to pojo I was getting value for columns whose Data type is Numeric mention 
below.

{code}
last_update_amount=java.nio.HeapByteBuffer[pos=0 lim=16 cap=16]
{code}

My Expectation was I will get exact value but getting the HyperByte Buffer the 
version I am using is Apache beam 2.12.0. If any more information is needed 
then please let me know.

Way 2 Tried:

{code:java}
GenericRecord s = schemaAndRecord.getRecord();
org.apache.avro.Schema s1 = s.getSchema();
for (Field f : s1.getFields()) {
  counter++;
  mapping.put(f.name(), null==s.get(f.name()) ? null : 
String.valueOf(s.get(counter)));
  if(f.name().equalsIgnoreCase("reason_code_id")) {
    BigDecimal numericValue = new Conversions.DecimalConversion()
       .fromBytes((ByteBuffer) s.get(f.name()), Schema.create(s1.getType()), 
s1.getLogicalType());
       System.out.println("Numeric Con"+numericValue);
} else {
  System.out.println("Else Condition "+f.name());
}
{code}

Facing Issue:

{code}
2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a: 
RECORD
{code}
 

It would be Great if we have a method which maps all the BigQuery Data with 
Pojo Schema which Means if I have 10 Columns in BQ and in my Pojo I need only 5 
Column then, in that case, BigQueryIO should map only that 5 Data values into 
Java Class and Rest will be Rejected As I am Doing After So much Effort. 
 Numeric Data Type must be Deserialize by itself while fetching data like 
TableRow.

 

  was:
While Developing my code I used the below code snippet to read the table data 
from BigQuery.

PCollection<ReasonCode> gpseEftReasonCodes = input. apply("Reading xxyyzz", 
BigQueryIO.read(new ReadTable<ReasonCode>(ReasonCode.class)) 
.withoutValidation().withTemplateCompatibility() .fromQuery("Select * from 
dataset.xxyyzz").usingStandardSql() 
.withCoder(SerializableCoder.of(xxyyzz.class)) 

Read Table Class:

@DefaultSchema(JavaBeanSchema.class) 

public class ReadTable<T> implements SerializableFunction<SchemaAndRecord, T> {

 private static final long serialVersionUID = 1L; 

private static Gson gson = new Gson(); 

public static final Logger LOG = LoggerFactory.getLogger(ReadTable.class); 
private final Counter countingRecords = 
Metrics.counter(ReadTable.class,"Reading Records EFT Report"); 

private Class<T> class1; public ReadTable(Class<T> class1) {

 this.class1 = class1; 

} 

public T apply(SchemaAndRecord schemaAndRecord) {

 Map<String, String> mapping = new HashMap<>(); 

int counter = 0; 

try { 

GenericRecord s = schemaAndRecord.getRecord(); 

org.apache.avro.Schema s1 = s.getSchema(); 

for (Field f : s1.getFields()) { 

counter++; mapping.put(f.name(), 
null==s.get(f.name())?null:String.valueOf(s.get(counter))); 

}

 countingRecords.inc(); 

JsonElement jsonElement = gson.toJsonTree(mapping); 

return gson.fromJson(jsonElement, class1); 

}catch(Exception mp) { 

LOG.error("Found Wrong Mapping for the Record: "+mapping); 
mp.printStackTrace(); return null; } } } 

So After Reading the data from Bigquery I was mapping data from SchemaAndRecord 
to pojo I was getting value for columns whose Data type is Numeric mention 
below.

last_update_amount=java.nio.HeapByteBuffer[pos=0 lim=16 cap=16]

My Expectation was I will get exact value but getting the HyperByte Buffer the 
version I am using is Apache beam 2.12.0. If any more information is needed 
then please let me know.

Way 2 Tried:

GenericRecord s = schemaAndRecord.getRecord(); 

org.apache.avro.Schema s1 = s.getSchema(); 

for (Field f : s1.getFields()) { 

counter++; mapping.put(f.name(), 
null==s.get(f.name())?null:String.valueOf(s.get(counter))); 
if(f.name().equalsIgnoreCase("reason_code_id")) { 

BigDecimal numericValue = new Conversions.DecimalConversion() 
.fromBytes((ByteBuffer)s.get(f.name()) , 

Schema.create(s1.getType()), s1.getLogicalType()); System.out.println("Numeric 
Con"+numericValue); } else { System.out.println("Else Condition "+f.name()); } 
} ``` 

Facing Issue: 

2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a: 
RECORD

 

It would be Great if we have a method which maps all the BigQuery Data with 
Pojo Schema which Means if I have 10 Columns in BQ and in my Pojo I need only 5 
Column then, in that case, BigQueryIO should map only that 5 Data values into 
Java Class and Rest will be Rejected As I am Doing After So much Effort. 
Numeric Data Type must be Deserialize by itself while fetching data like 
TableRow.

 


> Reading BigQuery Table Data into Java Classes(Pojo) Directly
> ------------------------------------------------------------
>
>                 Key: BEAM-7425
>                 URL: https://issues.apache.org/jira/browse/BEAM-7425
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>    Affects Versions: 2.12.0
>         Environment: Dataflow
>            Reporter: Kishan Kumar
>            Priority: Major
>              Labels: newbie, patch
>             Fix For: 2.14.0
>
>
> While Developing my code I used the below code snippet to read the table data 
> from BigQuery.
>  
> {code:java}
> PCollection<ReasonCode> gpseEftReasonCodes = input
>       .apply("Reading xxyyzz", 
>           BigQueryIO
>                   .read(new ReadTable<ReasonCode>(ReasonCode.class))
>                   .withoutValidation()
>                   .withTemplateCompatibility()
>                   .fromQuery("Select * from dataset.xxyyzz")
>                   .usingStandardSql()
>                   .withCoder(SerializableCoder.of(xxyyzz.class))
> {code}
> Read Table Class:
> {code:java}
> @DefaultSchema(JavaBeanSchema.class)
> public class ReadTable<T> implements SerializableFunction<SchemaAndRecord, T> 
> {
>   private static final long serialVersionUID = 1L;
>   private static Gson gson = new Gson();
>   public static final Logger LOG = LoggerFactory.getLogger(ReadTable.class); 
> private final Counter countingRecords = 
>   Metrics.counter(ReadTable.class, "Reading Records EFT Report");
>   private Class<T> class1;
>   
>   public ReadTable(Class<T> class1) { this.class1 = class1; }
>  
>   public T apply(SchemaAndRecord schemaAndRecord) {
>     Map<String, String> mapping = new HashMap<>();
>     int counter = 0;
>     try {
>       GenericRecord s = schemaAndRecord.getRecord();
>       org.apache.avro.Schema s1 = s.getSchema();
>       for (Field f : s1.getFields()) {
>         counter++;
>         mapping.put(f.name(), null==s.get(f.name()) ? null : 
> String.valueOf(s.get(counter)));
>       }
>       countingRecords.inc();
>       JsonElement jsonElement = gson.toJsonTree(mapping);
>       return gson.fromJson(jsonElement, class1);
>     } catch (Exception mp) {
>       LOG.error("Found Wrong Mapping for the Record: "+mapping); 
> mp.printStackTrace(); return null; }
>     }
> }
> {code}
> So After Reading the data from Bigquery I was mapping data from 
> SchemaAndRecord to pojo I was getting value for columns whose Data type is 
> Numeric mention below.
> {code}
> last_update_amount=java.nio.HeapByteBuffer[pos=0 lim=16 cap=16]
> {code}
> My Expectation was I will get exact value but getting the HyperByte Buffer 
> the version I am using is Apache beam 2.12.0. If any more information is 
> needed then please let me know.
> Way 2 Tried:
> {code:java}
> GenericRecord s = schemaAndRecord.getRecord();
> org.apache.avro.Schema s1 = s.getSchema();
> for (Field f : s1.getFields()) {
>   counter++;
>   mapping.put(f.name(), null==s.get(f.name()) ? null : 
> String.valueOf(s.get(counter)));
>   if(f.name().equalsIgnoreCase("reason_code_id")) {
>     BigDecimal numericValue = new Conversions.DecimalConversion()
>        .fromBytes((ByteBuffer) s.get(f.name()), Schema.create(s1.getType()), 
> s1.getLogicalType());
>        System.out.println("Numeric Con"+numericValue);
> } else {
>   System.out.println("Else Condition "+f.name());
> }
> {code}
> Facing Issue:
> {code}
> 2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a: 
> RECORD
> {code}
>  
> It would be Great if we have a method which maps all the BigQuery Data with 
> Pojo Schema which Means if I have 10 Columns in BQ and in my Pojo I need only 
> 5 Column then, in that case, BigQueryIO should map only that 5 Data values 
> into Java Class and Rest will be Rejected As I am Doing After So much Effort. 
>  Numeric Data Type must be Deserialize by itself while fetching data like 
> TableRow.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to