[
https://issues.apache.org/jira/browse/PARQUET-1407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692345#comment-16692345
]
ASF GitHub Bot commented on PARQUET-1407:
-----------------------------------------
rdblue closed pull request #552: PARQUET-1407: Data loss on duplicate values
with AvroParquetWriter/Reader
URL: https://github.com/apache/parquet-mr/pull/552
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
index 817f07430..cc49cc200 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
@@ -76,9 +76,13 @@ public void setDictionary(Dictionary dictionary) {
}
}
+ public T prepareDictionaryValue(T value) {
+ return value;
+ }
+
@Override
public void addValueFromDictionary(int dictionaryId) {
- parent.add(dict[dictionaryId]);
+ parent.add(prepareDictionaryValue(dict[dictionaryId]));
}
}
@@ -220,6 +224,11 @@ public FieldByteBufferConverter(ParentValueContainer
parent) {
public ByteBuffer convert(Binary binary) {
return ByteBuffer.wrap(binary.getBytes());
}
+
+ @Override
+ public ByteBuffer prepareDictionaryValue(ByteBuffer value) {
+ return value.duplicate();
+ }
}
static final class FieldStringConverter extends BinaryConverter<String> {
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 2335e364c..69a73cb06 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -23,6 +23,7 @@
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import java.io.File;
+import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
@@ -87,10 +89,7 @@ public void testEmptyArray() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("array.avsc").openStream());
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
@@ -117,10 +116,7 @@ public void testEmptyMap() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("map.avsc").openStream());
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
@@ -147,10 +143,7 @@ public void testMapWithNulls() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("map_with_nulls.avsc").openStream());
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
@@ -182,10 +175,7 @@ public void testMapRequiredValueWithNull() throws
Exception {
schema.setFields(Lists.newArrayList(
new Schema.Field("mymap",
Schema.createMap(Schema.create(Schema.Type.INT)), null, null)));
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
@@ -209,10 +199,7 @@ public void testMapWithUtf8Key() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("map.avsc").openStream());
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
@@ -346,11 +333,8 @@ public void testAll() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("all.avsc").openStream());
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
-
+ Path file = new Path(createTempFile().getPath());
+
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
.withSchema(schema)
@@ -429,10 +413,7 @@ public void testAll() throws Exception {
@Test
public void testAllUsingDefaultAvroSchema() throws Exception {
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
// write file using Parquet APIs
ParquetWriter<Map<String, Object>> parquetWriter = new
ParquetWriter<Map<String, Object>>(file,
@@ -654,10 +635,7 @@ public void testUnionWithSingleNonNullType() throws
Exception {
Collections.singletonList(new Schema.Field("value",
Schema.createUnion(Schema.create(Schema.Type.STRING)), null, null)));
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
// Parquet writer
ParquetWriter parquetWriter =
AvroParquetWriter.builder(file).withSchema(avroSchema)
@@ -678,6 +656,46 @@ public void testUnionWithSingleNonNullType() throws
Exception {
assertEquals(str("theValue"), nextRecord.get("value"));
}
+ @Test
+ public void testDuplicatedValuesWithDictionary() throws Exception {
+ Schema schema = SchemaBuilder.record("spark_schema")
+ .fields().optionalBytes("value").endRecord();
+
+ Path file = new Path(createTempFile().getPath());
+
+ String[] records = {"one", "two", "three", "three", "two", "one", "zero"};
+ try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter
+ .<GenericData.Record>builder(file)
+ .withSchema(schema)
+ .withConf(testConf)
+ .build()) {
+ for (String record : records) {
+ writer.write(new GenericRecordBuilder(schema)
+ .set("value", record.getBytes()).build());
+ }
+ }
+
+ try (ParquetReader<GenericRecord> reader = AvroParquetReader
+ .<GenericRecord>builder(file)
+ .withConf(testConf).build()) {
+ GenericRecord rec;
+ int i = 0;
+ while ((rec = reader.read()) != null) {
+ ByteBuffer buf = (ByteBuffer) rec.get("value");
+ byte[] bytes = new byte[buf.remaining()];
+ buf.get(bytes);
+ assertEquals(records[i++], new String(bytes));
+ }
+ }
+ }
+
+ private File createTempFile() throws IOException {
+ File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+ tmp.deleteOnExit();
+ tmp.delete();
+ return tmp;
+ }
+
/**
* Return a String or Utf8 depending on whether compatibility is on
*/
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Data loss on duplicate values with AvroParquetWriter/Reader
> -----------------------------------------------------------
>
> Key: PARQUET-1407
> URL: https://issues.apache.org/jira/browse/PARQUET-1407
> Project: Parquet
> Issue Type: Bug
> Components: parquet-avro
> Affects Versions: 1.9.0, 1.10.0, 1.8.3
> Reporter: Scott Carey
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.11.0
>
>
> {code:java}
> public class Blah {
> private static Path parquetFile = new Path("oops");
> private static Schema schema = SchemaBuilder.record("spark_schema")
> .fields().optionalBytes("value").endRecord();
> private static GenericData.Record recordFor(String value) {
> return new GenericRecordBuilder(schema)
> .set("value", value.getBytes()).build();
> }
> public static void main(String ... args) throws IOException {
> try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter
> .<GenericData.Record>builder(parquetFile)
> .withSchema(schema)
> .build()) {
> writer.write(recordFor("one"));
> writer.write(recordFor("two"));
> writer.write(recordFor("three"));
> writer.write(recordFor("three"));
> writer.write(recordFor("two"));
> writer.write(recordFor("one"));
> writer.write(recordFor("zero"));
> }
> try (ParquetReader<GenericRecord> reader = AvroParquetReader
> .<GenericRecord>builder(parquetFile)
> .withConf(new Configuration()).build()) {
> GenericRecord rec;
> int i = 0;
> while ((rec = reader.read()) != null) {
> ByteBuffer buf = (ByteBuffer) rec.get("value");
> byte[] bytes = new byte[buf.remaining()];
> buf.get(bytes);
> System.out.println("rec " + i++ + ": " + new String(bytes));
> }
> }
> }
> }
> {code}
> Expected output:
> {noformat}
> rec 0: one
> rec 1: two
> rec 2: three
> rec 3: three
> rec 4: two
> rec 5: one
> rec 6: zero{noformat}
> Actual:
> {noformat}
> rec 0: one
> rec 1: two
> rec 2: three
> rec 3:
> rec 4:
> rec 5:
> rec 6: zero{noformat}
>
> This was found when we started getting empty byte[] values back in spark
> unexpectedly. (Spark 2.3.1 and Parquet 1.8.3). I have not tried to
> reproduce with parquet 1.9.0, but its a bad enough bug that I would like a
> 1.8.4 release that I can drop-in replace 1.8.3 without any binary
> compatibility issues.
> Duplicate byte[] values are lost.
>
> A few clues:
> If I do not call ByteBuffer.get, the size of ByteBuffer.remaining does not go
> to zero. I suspect a ByteBuffer is being recycled, but the call to
> ByteBuffer.get mutates it. I wonder if an appropriately placed
> ByteBuffer.duplicate() would fix it.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)