[ 
https://issues.apache.org/jira/browse/BEAM-8913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Webb updated BEAM-8913:
----------------------------
    Resolution: Won't Fix
        Status: Resolved  (was: Triage Needed)

older issue - closing

 

> ParquetIO cannot read files written by itself using reflection
> --------------------------------------------------------------
>
>                 Key: BEAM-8913
>                 URL: https://issues.apache.org/jira/browse/BEAM-8913
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-parquet
>    Affects Versions: 2.15.0, 2.16.0
>         Environment: Java 8, JUnit 4
>            Reporter: Apoorv Gupta
>            Priority: P3
>
> Apache Beam is unable to read Parquet files when they are written using a 
> Schema generated by reflection. However, it is able to read Parquet files 
> when they are written using a hardcoded Schema.
>  
> The following test passes right now. However, it fails when 'SCHEMA' is 
> replaced with 'SCHEMA_FAILS' in this test.
>  
> package com.example;
> import java.io.Serializable;
> import java.lang.reflect.Field;
> import org.apache.avro.Schema;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.reflect.ReflectData;
> import org.apache.beam.sdk.coders.AvroCoder;
> import org.apache.beam.sdk.io.FileIO;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.parquet.ParquetIO;
> import org.apache.beam.sdk.testing.PAssert;
> import org.apache.beam.sdk.testing.TestPipeline;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
> import org.apache.beam.sdk.transforms.Filter;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.Values;
> import org.apache.beam.sdk.values.PCollection;
> import org.junit.Rule;
> import org.junit.Test;
> import org.junit.rules.TemporaryFolder;
> import org.junit.runner.RunWith;
> import org.junit.runners.JUnit4;
> @RunWith(JUnit4.class)
> public final class ReflectionTest {
>  @Rule public transient TestPipeline pipeline = TestPipeline.create();
> @Rule public transient TemporaryFolder temporaryFolder = new 
> TemporaryFolder();
> private static final Schema SCHEMA_FAILS = 
> ReflectData.get().getSchema(Transaction.class);
>  private static final Schema SCHEMA = new 
> Schema.Parser().parse(Transaction.SCHEMA);
> /**
>  * This test creates GenericRecord objects, writes them to Parquet files and 
> reads them back.
>  *
>  * <p>However, it is able to read Parquet files only when they are written 
> using a hardcoded
>  * Schema (see ReflectionTest.SCHEMA defined above).
>  *
>  * <p>It is unable to read Parquet files when they are written using a Schema 
> generated by
>  * reflection (see ReflectionTest.SCHEMA_FAILS defined above).
>  */
>  @Test
>  public void genericRecordToTableRow_convertsGenericRecordToTableRow() {
>  PCollection<GenericRecord> pgr =
>  pipeline
>  .apply(GenerateSequence.from(0).to(2))
>  .apply("translateToGeneric", ParDo.of(new LongToGenericRecord()))
>  .setCoder(AvroCoder.of(SCHEMA));
> PCollection<GenericRecord> writeThenRead =
>  pgr.apply(
>  FileIO.<GenericRecord>write()
>  .via(ParquetIO.sink(SCHEMA))
>  .to(temporaryFolder.getRoot().getAbsolutePath()))
>  .getPerDestinationOutputFilenames()
>  .apply(Values.create())
>  .apply(FileIO.matchAll())
>  .apply(FileIO.readMatches())
>  .apply(ParquetIO.readFiles(SCHEMA))
>  .apply(Filter.by(x -> false));
> PAssert.that(writeThenRead).empty();
>  pipeline.run().waitUntilFinish();
>  }
> static class LongToGenericRecord extends DoFn<Long, GenericRecord> {
>  @ProcessElement
>  public void processElement(ProcessContext context) {
>  Transaction tr = new Transaction(context.element());
>  GenericRecord result = new GenericData.Record(SCHEMA);
>  for (Schema.Field r : SCHEMA.getFields()) {
>  String name = r.name();
>  try {
>  Field f = Transaction.class.getDeclaredField(name);
>  f.setAccessible(true);
>  result.put(name, f.get(tr));
>  } catch (NoSuchFieldException nsfe) {
>  throw new RuntimeException("no such field: " + name, nsfe);
>  } catch (IllegalAccessException iae) {
>  throw new RuntimeException("no access to field: " + name, iae);
>  }
>  }
>  context.output(result);
>  }
>  }
> /** represents a row in our generated data */
>  public static final class Transaction implements Serializable {
> double amountBase = 0.0;
> public Transaction(double amt) {
>  amountBase = amt;
>  }
> double getAmountBase() {
>  return amountBase;
>  }
> public static final String SCHEMA =
>  "{\n"
>  + " \"namespace\": \"sample\",\n"
>  + " \"type\": \"record\",\n"
>  + " \"name\": \"Transaction\",\n"
>  + " \"fields\": [\n"
>  + " \{\"name\": \"amountBase\", \"type\": \"double\"}\n"
>  + " ]\n"
>  + "}";
> public boolean equals(Object t) {
>  if (t instanceof Transaction) {
>  return ((Transaction) t).getAmountBase() == amountBase;
>  } else {
>  return false;
>  }
>  }
> public int hashCode() {
>  return (int) amountBase;
>  }
>  }
> }
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to