NathanEckert opened a new issue, #2975:
URL: https://github.com/apache/parquet-java/issues/2975
### Describe the bug, including details regarding any error messages,
version, and platform.
When upgrading from 1.13.1 to 1.14.1:
```
java.lang.ClassCastException: class
org.apache.avro.generic.GenericData$Record cannot be cast to
org.myorg.parquet.TestUpgradeParquet$CustomParquetRecord
```
I get a different error when upgrading only to `1.14.0`:
```
java.lang.RuntimeException:
shaded.parquet.com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
Java 8 optional type `java.util.Optional<java.lang.Long>` not supported by
default
```
I manage to reproduce it with the following code:
```java
package org.myorg.parquet;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import lombok.Cleanup;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificData.SchemaConstructable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* Test upgrade parquet.
* <p>Works with org.apache.parquet:parquet-avro:1.13.1 (failure with
simplified example, but different stacktrace than the other ones, as it fails
later)</p>
* <p>Fails with org.apache.parquet:parquet-avro:1.14.0 with
shaded.parquet.com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
Java 8 optional type `java.util.Optional<java.lang.Long>` not supported by
default: add Module ...</p>
* <p>Fails with org.apache.parquet:parquet-avro:1.14.1 with
java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record
cannot be cast to class com.activeviam.source.parquet.api.IParquetRecord</p>
*/
class TestUpgradeParquet {
private static final Path TEMP_FOLDER = Paths.get(System.
getProperty("java.io.tmpdir"), "directoryForTest");
private static final Path TEMP_FILE = Paths.get(TEMP_FOLDER.toString(),
"source.parquet");
private Path parquetFilePath;
@BeforeEach
public void createParquet() throws Exception {
// Cleanup of previous test run
Files.list(TEMP_FOLDER).forEach(path -> {
try {
Files.delete(path);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
Files.deleteIfExists(TEMP_FOLDER);
Thread.sleep(Duration.ofSeconds(1));
// Actually create the folder and file for this test
Files.createDirectory(TEMP_FOLDER);
this.parquetFilePath = Files.createFile(TEMP_FILE);
final Schema schema =
SchemaBuilder.record("simpleSchema")
.fields()
.name("Key")
.type()
.intType()
.noDefault()
.name("FieldA")
.type()
.doubleType()
.noDefault()
.endRecord();
final Record record1 = new Record(schema);
record1.put(0, 1);
record1.put(1, 2.0);
final Record record2 = new Record(schema);
record2.put(0, 2);
record2.put(1, 5.0);
final Collection<Record> recordsToWrite = List.of(record1, record2);
writeParquetFile(
this.parquetFilePath,
schema,
recordsToWrite,
new Configuration(),
CompressionCodecName.UNCOMPRESSED);
}
@Test
void testUpgradeParquet() throws Exception {
final org.apache.hadoop.fs.Path filepath = new
org.apache.hadoop.fs.Path(this.parquetFilePath.toString());
final Configuration configuration = new Configuration();
@Cleanup
final ParquetReader<CustomParquetRecord> parquetReader = ParquetReader
.builder(new CustomReadSupport(), filepath)
.withConf(configuration)
.build();
final var pivotRecord = parquetReader.read(); // Failure here
assertThat(pivotRecord).isNotNull();
}
private interface CustomParquetRecord extends GenericRecord,
SchemaConstructable {
}
private static class CustomReadSupport extends
AvroReadSupport<CustomParquetRecord> {
@Override
public RecordMaterializer<CustomParquetRecord> prepareForRead(
Configuration configuration,
Map<String, String> keyValueMetaData,
MessageType fileSchema,
ReadContext readContext) {
final RecordMaterializer<CustomParquetRecord> rMaterializer =
super.prepareForRead(configuration, keyValueMetaData, fileSchema,
readContext);
return new RecordMaterializer<>() {
@Override
public CustomParquetRecord getCurrentRecord() {
return rMaterializer.getCurrentRecord();
}
@Override
public GroupConverter getRootConverter() {
// Difference here with actual code,
// we return a custom root converter
// It is not included in this reproducer as it would bring way too
much code
return null;
}
};
}
}
/**
* Creates a parquet file with contents.
*
* @param filePath filePath where the file is created
* @param schema schema of the data
* @param recordsToWrite records to write
* @param configuration the hadoop configuration to use for the writer
* @param compressionCodec the codec for the compression scheme to use for
the parquet file
*/
private static void writeParquetFile(
final Path filePath,
final Schema schema,
final Collection<Record> recordsToWrite,
final Configuration configuration,
final CompressionCodecName compressionCodec)
throws IOException, InterruptedException {
try {
Files.deleteIfExists(filePath);
} catch (final NoSuchFileException x) {
throw new RuntimeException(
String.format("%s: no such file or directory.", filePath), x);
} catch (final DirectoryNotEmptyException x) {
throw new RuntimeException(String.format("%s not empty.", filePath),
x);
} catch (final IOException x) {
throw new RuntimeException(x);
}
final org.apache.hadoop.fs.Path path =
new org.apache.hadoop.fs.Path(filePath.toAbsolutePath().toString());
RecordWriter<Void, Record> writer = null;
try {
writer =
new ParquetOutputFormat<Record>(
new AvroWriteSupport<>(
new AvroSchemaConverter(configuration).convert(schema),
schema,
SpecificData.get()))
.getRecordWriter(configuration, path, compressionCodec,
Mode.CREATE);
for (final Record record : recordsToWrite) {
writer.write(null, record);
}
} finally {
if (writer != null) {
writer.close(null);
}
}
}
}
```
N-B: This code will throw when using `1.13.1` but it is expected. I didn't
wanted to complexify the reproducer with other code.
You will get this error:
```
org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in
block -1 in file file:/tmp/directoryForTest/source.parquet
```
### Component(s)
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]