[
https://issues.apache.org/jira/browse/BEAM-7425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kishan Kumar updated BEAM-7425:
-------------------------------
Description:
While Developing my code I used the below code snippet to read the table data
from BigQuery.
PCollection<ReasonCode> gpseEftReasonCodes = input. apply("Reading xxyyzz",
BigQueryIO.read(new ReadTable<ReasonCode>(ReasonCode.class))
.withoutValidation().withTemplateCompatibility() .fromQuery("Select * from
dataset.xxyyzz").usingStandardSql()
.withCoder(SerializableCoder.of(xxyyzz.class))
Read Table Class:
@DefaultSchema(JavaBeanSchema.class)
public class ReadTable<T> implements SerializableFunction<SchemaAndRecord, T> {
private static final long serialVersionUID = 1L;
private static Gson gson = new Gson();
public static final Logger LOG = LoggerFactory.getLogger(ReadTable.class);
private final Counter countingRecords =
Metrics.counter(ReadTable.class,"Reading Records EFT Report");
private Class<T> class1; public ReadTable(Class<T> class1) {
this.class1 = class1;
}
public T apply(SchemaAndRecord schemaAndRecord) {
Map<String, String> mapping = new HashMap<>();
int counter = 0;
try {
GenericRecord s = schemaAndRecord.getRecord();
org.apache.avro.Schema s1 = s.getSchema();
for (Field f : s1.getFields()) {
counter++; mapping.put(f.name(),
null==s.get(f.name())?null:String.valueOf(s.get(counter)));
}
countingRecords.inc();
JsonElement jsonElement = gson.toJsonTree(mapping);
return gson.fromJson(jsonElement, class1);
}catch(Exception mp) {
LOG.error("Found Wrong Mapping for the Record: "+mapping);
mp.printStackTrace(); return null; } } }
So After Reading the data from Bigquery I was mapping data from SchemaAndRecord
to pojo I was getting value for columns whose Data type is Numeric mention
below.
last_update_amount=java.nio.HeapByteBuffer[pos=0 lim=16 cap=16]
My Expectation was I will get exact value but getting the HyperByte Buffer the
version I am using is Apache beam 2.12.0. If any more information is needed
then please let me know.
Way 2 Tried:
GenericRecord s = schemaAndRecord.getRecord();
org.apache.avro.Schema s1 = s.getSchema();
for (Field f : s1.getFields()) {
counter++; mapping.put(f.name(),
null==s.get(f.name())?null:String.valueOf(s.get(counter)));
if(f.name().equalsIgnoreCase("reason_code_id")) {
BigDecimal numericValue = new Conversions.DecimalConversion()
.fromBytes((ByteBuffer)s.get(f.name()) ,
Schema.create(s1.getType()), s1.getLogicalType()); System.out.println("Numeric
Con"+numericValue); } else { System.out.println("Else Condition "+f.name()); }
} ```
Facing Issue:
2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a:
RECORD
It would be Great if we have a method which map all the BigQuery Data with Pojo
Schema which Means if i have 10 Columns in BQ and in my Pojo i need only 5
Column then in that case BigQueryIO should map only that 5 Data values and Rest
will be Rejected As i am Doing After So much Effort. Numeric Data Type must be
Deserialize by it self while fetching data like TableRow.
was:
While Developing my code I used the below code snippet to read the table data
from BigQuery.
{{PCollection<ReasonCode> gpseEftReasonCodes = input. apply("Reading xxyyzz",
BigQueryIO.read(new ReadTable<ReasonCode>(ReasonCode.class))
.withoutValidation().withTemplateCompatibility() .fromQuery("Select * from
dataset.xxyyzz").usingStandardSql()
.withCoder(SerializableCoder.of(xxyyzz.class)) }}
*Read Table Class:*
{{@DefaultSchema(JavaBeanSchema.class) }}
{{public class ReadTable<T> implements SerializableFunction<SchemaAndRecord, T>
{}}
{{ private static final long serialVersionUID = 1L; }}
{{private static Gson gson = new Gson(); }}
{{public static final Logger LOG = LoggerFactory.getLogger(ReadTable.class);
private final Counter countingRecords =
Metrics.counter(ReadTable.class,"Reading Records EFT Report"); }}
{{private Class<T> class1; public ReadTable(Class<T> class1) {}}
{{ this.class1 = class1; }}
{{} }}
{{public T apply(SchemaAndRecord schemaAndRecord) {}}
{{ Map<String, String> mapping = new HashMap<>(); }}
{{int counter = 0; }}
{{try { }}
{{GenericRecord s = schemaAndRecord.getRecord(); }}
{{org.apache.avro.Schema s1 = s.getSchema(); }}
{{for (Field f : s1.getFields()) { }}
{{counter++; mapping.put(f.name(),
null==s.get(f.name())?null:String.valueOf(s.get(counter))); }}
{{}}}
{{ countingRecords.inc(); }}
{{JsonElement jsonElement = gson.toJsonTree(mapping); }}
{{return gson.fromJson(jsonElement, class1); }}
{{}catch(Exception mp) { }}
{{LOG.error("Found Wrong Mapping for the Record: "+mapping);
mp.printStackTrace(); return null; } } } }}
So After Reading the data from Bigquery I was mapping data from SchemaAndRecord
to pojo I was getting value for columns whose Data type is Numeric mention
below.
*{{last_update_amount=java.nio.HeapByteBuffer[pos=0 lim=16 cap=16]}}*
My Expectation was I will get exact value but getting the HyperByte Buffer the
version I am using is Apache beam 2.12.0. If any more information is needed
then please let me know.
Way 2 Tried:
{{GenericRecord s = schemaAndRecord.getRecord(); }}
{{org.apache.avro.Schema s1 = s.getSchema(); }}
{{for (Field f : s1.getFields()) { }}
{{counter++; mapping.put(f.name(),
null==s.get(f.name())?null:String.valueOf(s.get(counter)));
if(f.name().equalsIgnoreCase("reason_code_id")) { }}
{{BigDecimal numericValue = new Conversions.DecimalConversion()
.fromBytes((ByteBuffer)s.get(f.name()) , }}
{{Schema.create(s1.getType()), s1.getLogicalType());
System.out.println("Numeric Con"+numericValue); } else \{
System.out.println("Else Condition "+f.name()); } } ``` }}
{{Facing Issue: }}
{{2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a:
RECORD}}
{{It would be Great if we have a method which map all the BigQuery Data with
Pojo Schema which Means if i have 10 Columns in BQ and in my Pojo i need only 5
Column then in that case BigQueryIO should map only that 5 Data values and Rest
will be Rejected As i am Doing After So much Effort. Numeric Data Type must be
Deserialize by it self while fetching data like TableRow.}}
> Reading BigQuery Table Data into Java Classes(Pojo) Directly
> ------------------------------------------------------------
>
> Key: BEAM-7425
> URL: https://issues.apache.org/jira/browse/BEAM-7425
> Project: Beam
> Issue Type: New Feature
> Components: beam-model
> Affects Versions: 2.12.0
> Environment: Dataflow
> Reporter: Kishan Kumar
> Priority: Major
> Labels: newbie, patch
> Fix For: 2.13.0
>
>
> While Developing my code I used the below code snippet to read the table data
> from BigQuery.
> PCollection<ReasonCode> gpseEftReasonCodes = input. apply("Reading xxyyzz",
> BigQueryIO.read(new ReadTable<ReasonCode>(ReasonCode.class))
> .withoutValidation().withTemplateCompatibility() .fromQuery("Select * from
> dataset.xxyyzz").usingStandardSql()
> .withCoder(SerializableCoder.of(xxyyzz.class))
> Read Table Class:
> @DefaultSchema(JavaBeanSchema.class)
> public class ReadTable<T> implements SerializableFunction<SchemaAndRecord, T>
> {
> private static final long serialVersionUID = 1L;
> private static Gson gson = new Gson();
> public static final Logger LOG = LoggerFactory.getLogger(ReadTable.class);
> private final Counter countingRecords =
> Metrics.counter(ReadTable.class,"Reading Records EFT Report");
> private Class<T> class1; public ReadTable(Class<T> class1) {
> this.class1 = class1;
> }
> public T apply(SchemaAndRecord schemaAndRecord) {
> Map<String, String> mapping = new HashMap<>();
> int counter = 0;
> try {
> GenericRecord s = schemaAndRecord.getRecord();
> org.apache.avro.Schema s1 = s.getSchema();
> for (Field f : s1.getFields()) {
> counter++; mapping.put(f.name(),
> null==s.get(f.name())?null:String.valueOf(s.get(counter)));
> }
> countingRecords.inc();
> JsonElement jsonElement = gson.toJsonTree(mapping);
> return gson.fromJson(jsonElement, class1);
> }catch(Exception mp) {
> LOG.error("Found Wrong Mapping for the Record: "+mapping);
> mp.printStackTrace(); return null; } } }
> So After Reading the data from Bigquery I was mapping data from
> SchemaAndRecord to pojo I was getting value for columns whose Data type is
> Numeric mention below.
> last_update_amount=java.nio.HeapByteBuffer[pos=0 lim=16 cap=16]
> My Expectation was I will get exact value but getting the HyperByte Buffer
> the version I am using is Apache beam 2.12.0. If any more information is
> needed then please let me know.
> Way 2 Tried:
> GenericRecord s = schemaAndRecord.getRecord();
> org.apache.avro.Schema s1 = s.getSchema();
> for (Field f : s1.getFields()) {
> counter++; mapping.put(f.name(),
> null==s.get(f.name())?null:String.valueOf(s.get(counter)));
> if(f.name().equalsIgnoreCase("reason_code_id")) {
> BigDecimal numericValue = new Conversions.DecimalConversion()
> .fromBytes((ByteBuffer)s.get(f.name()) ,
> Schema.create(s1.getType()), s1.getLogicalType());
> System.out.println("Numeric Con"+numericValue); } else {
> System.out.println("Else Condition "+f.name()); } } ```
> Facing Issue:
> 2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a:
> RECORD
>
> It would be Great if we have a method which map all the BigQuery Data with
> Pojo Schema which Means if i have 10 Columns in BQ and in my Pojo i need only
> 5 Column then in that case BigQueryIO should map only that 5 Data values and
> Rest will be Rejected As i am Doing After So much Effort. Numeric Data Type
> must be Deserialize by it self while fetching data like TableRow.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)