[
https://issues.apache.org/jira/browse/BEAM-8913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jeff Webb updated BEAM-8913:
----------------------------
Resolution: Won't Fix
Status: Resolved (was: Triage Needed)
older issue - closing
> ParquetIO cannot read files written by itself using reflection
> --------------------------------------------------------------
>
> Key: BEAM-8913
> URL: https://issues.apache.org/jira/browse/BEAM-8913
> Project: Beam
> Issue Type: Bug
> Components: io-java-parquet
> Affects Versions: 2.15.0, 2.16.0
> Environment: Java 8, JUnit 4
> Reporter: Apoorv Gupta
> Priority: P3
>
> Apache Beam is unable to read Parquet files when they are written using a
> Schema generated by reflection. However, it is able to read Parquet files
> when they are written using a hardcoded Schema.
>
> The following test passes right now. However, it fails when 'SCHEMA' is
> replaced with 'SCHEMA_FAILS' in this test.
>
> package com.example;
> import java.io.Serializable;
> import java.lang.reflect.Field;
> import org.apache.avro.Schema;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.reflect.ReflectData;
> import org.apache.beam.sdk.coders.AvroCoder;
> import org.apache.beam.sdk.io.FileIO;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.parquet.ParquetIO;
> import org.apache.beam.sdk.testing.PAssert;
> import org.apache.beam.sdk.testing.TestPipeline;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
> import org.apache.beam.sdk.transforms.Filter;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.Values;
> import org.apache.beam.sdk.values.PCollection;
> import org.junit.Rule;
> import org.junit.Test;
> import org.junit.rules.TemporaryFolder;
> import org.junit.runner.RunWith;
> import org.junit.runners.JUnit4;
> @RunWith(JUnit4.class)
> public final class ReflectionTest {
> @Rule public transient TestPipeline pipeline = TestPipeline.create();
> @Rule public transient TemporaryFolder temporaryFolder = new
> TemporaryFolder();
> private static final Schema SCHEMA_FAILS =
> ReflectData.get().getSchema(Transaction.class);
> private static final Schema SCHEMA = new
> Schema.Parser().parse(Transaction.SCHEMA);
> /**
> * This test creates GenericRecord objects, writes them to Parquet files and
> reads them back.
> *
> * <p>However, it is able to read Parquet files only when they are written
> using a hardcoded
> * Schema (see ReflectionTest.SCHEMA defined above).
> *
> * <p>It is unable to read Parquet files when they are written using a Schema
> generated by
> * reflection (see ReflectionTest.SCHEMA_FAILS defined above).
> */
> @Test
> public void genericRecordToTableRow_convertsGenericRecordToTableRow() {
> PCollection<GenericRecord> pgr =
> pipeline
> .apply(GenerateSequence.from(0).to(2))
> .apply("translateToGeneric", ParDo.of(new LongToGenericRecord()))
> .setCoder(AvroCoder.of(SCHEMA));
> PCollection<GenericRecord> writeThenRead =
> pgr.apply(
> FileIO.<GenericRecord>write()
> .via(ParquetIO.sink(SCHEMA))
> .to(temporaryFolder.getRoot().getAbsolutePath()))
> .getPerDestinationOutputFilenames()
> .apply(Values.create())
> .apply(FileIO.matchAll())
> .apply(FileIO.readMatches())
> .apply(ParquetIO.readFiles(SCHEMA))
> .apply(Filter.by(x -> false));
> PAssert.that(writeThenRead).empty();
> pipeline.run().waitUntilFinish();
> }
> static class LongToGenericRecord extends DoFn<Long, GenericRecord> {
> @ProcessElement
> public void processElement(ProcessContext context) {
> Transaction tr = new Transaction(context.element());
> GenericRecord result = new GenericData.Record(SCHEMA);
> for (Schema.Field r : SCHEMA.getFields()) {
> String name = r.name();
> try {
> Field f = Transaction.class.getDeclaredField(name);
> f.setAccessible(true);
> result.put(name, f.get(tr));
> } catch (NoSuchFieldException nsfe) {
> throw new RuntimeException("no such field: " + name, nsfe);
> } catch (IllegalAccessException iae) {
> throw new RuntimeException("no access to field: " + name, iae);
> }
> }
> context.output(result);
> }
> }
> /** represents a row in our generated data */
> public static final class Transaction implements Serializable {
> double amountBase = 0.0;
> public Transaction(double amt) {
> amountBase = amt;
> }
> double getAmountBase() {
> return amountBase;
> }
> public static final String SCHEMA =
> "{\n"
> + " \"namespace\": \"sample\",\n"
> + " \"type\": \"record\",\n"
> + " \"name\": \"Transaction\",\n"
> + " \"fields\": [\n"
> + " \{\"name\": \"amountBase\", \"type\": \"double\"}\n"
> + " ]\n"
> + "}";
> public boolean equals(Object t) {
> if (t instanceof Transaction) {
> return ((Transaction) t).getAmountBase() == amountBase;
> } else {
> return false;
> }
> }
> public int hashCode() {
> return (int) amountBase;
> }
> }
> }
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)