This is an automated email from the ASF dual-hosted git repository. blue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push: new 542ab3e PARQUET-1407: Avro: Fix binary values returned from dictionary encoding (#552) 542ab3e is described below commit 542ab3e2b321d5f755f3e9c6b997a458f8cf0f5e Author: nandorKollar <nandorkol...@users.noreply.github.com> AuthorDate: Mon Nov 19 23:07:55 2018 +0100 PARQUET-1407: Avro: Fix binary values returned from dictionary encoding (#552) * PARQUET-1407: Add test case for PARQUET-1407 to demonstrate the issue * PARQUET-1407: Fix binary values from dictionary encoding. Closes #551. --- .../org/apache/parquet/avro/AvroConverters.java | 11 ++- .../org/apache/parquet/avro/TestReadWrite.java | 84 +++++++++++++--------- 2 files changed, 61 insertions(+), 34 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java index 817f074..cc49cc2 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java @@ -76,9 +76,13 @@ public class AvroConverters { } } + public T prepareDictionaryValue(T value) { + return value; + } + @Override public void addValueFromDictionary(int dictionaryId) { - parent.add(dict[dictionaryId]); + parent.add(prepareDictionaryValue(dict[dictionaryId])); } } @@ -220,6 +224,11 @@ public class AvroConverters { public ByteBuffer convert(Binary binary) { return ByteBuffer.wrap(binary.getBytes()); } + + @Override + public ByteBuffer prepareDictionaryValue(ByteBuffer value) { + return value.duplicate(); + } } static final class FieldStringConverter extends BinaryConverter<String> { diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index 2335e36..69a73cb 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Resources; import java.io.File; +import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; @@ -37,6 +38,7 @@ import java.util.Random; import org.apache.avro.Conversions; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; @@ -87,10 +89,7 @@ public class TestReadWrite { Schema schema = new Schema.Parser().parse( Resources.getResource("array.avsc").openStream()); - File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); - tmp.deleteOnExit(); - tmp.delete(); - Path file = new Path(tmp.getPath()); + Path file = new Path(createTempFile().getPath()); ParquetWriter<GenericRecord> writer = AvroParquetWriter .<GenericRecord>builder(file) @@ -117,10 +116,7 @@ public class TestReadWrite { Schema schema = new Schema.Parser().parse( Resources.getResource("map.avsc").openStream()); - File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); - tmp.deleteOnExit(); - tmp.delete(); - Path file = new Path(tmp.getPath()); + Path file = new Path(createTempFile().getPath()); ParquetWriter<GenericRecord> writer = AvroParquetWriter .<GenericRecord>builder(file) @@ -147,10 +143,7 @@ public class TestReadWrite { Schema schema = new Schema.Parser().parse( Resources.getResource("map_with_nulls.avsc").openStream()); - File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); - tmp.deleteOnExit(); - tmp.delete(); - Path file = new Path(tmp.getPath()); + Path file = new Path(createTempFile().getPath()); ParquetWriter<GenericRecord> writer = AvroParquetWriter .<GenericRecord>builder(file) @@ -182,10 +175,7 @@ public class TestReadWrite { schema.setFields(Lists.newArrayList( new Schema.Field("mymap", Schema.createMap(Schema.create(Schema.Type.INT)), null, null))); - File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); - tmp.deleteOnExit(); - tmp.delete(); - Path file = new Path(tmp.getPath()); + Path file = new Path(createTempFile().getPath()); ParquetWriter<GenericRecord> writer = AvroParquetWriter .<GenericRecord>builder(file) @@ -209,10 +199,7 @@ public class TestReadWrite { Schema schema = new Schema.Parser().parse( Resources.getResource("map.avsc").openStream()); - File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); - tmp.deleteOnExit(); - tmp.delete(); - Path file = new Path(tmp.getPath()); + Path file = new Path(createTempFile().getPath()); ParquetWriter<GenericRecord> writer = AvroParquetWriter .<GenericRecord>builder(file) @@ -346,11 +333,8 @@ public class TestReadWrite { Schema schema = new Schema.Parser().parse( Resources.getResource("all.avsc").openStream()); - File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); - tmp.deleteOnExit(); - tmp.delete(); - Path file = new Path(tmp.getPath()); - + Path file = new Path(createTempFile().getPath()); + ParquetWriter<GenericRecord> writer = AvroParquetWriter .<GenericRecord>builder(file) .withSchema(schema) @@ -429,10 +413,7 @@ public class TestReadWrite { @Test public void testAllUsingDefaultAvroSchema() throws Exception { - File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); - tmp.deleteOnExit(); - tmp.delete(); - Path file = new Path(tmp.getPath()); + Path file = new Path(createTempFile().getPath()); // write file using Parquet APIs ParquetWriter<Map<String, Object>> parquetWriter = new ParquetWriter<Map<String, Object>>(file, @@ -654,10 +635,7 @@ public class TestReadWrite { Collections.singletonList(new Schema.Field("value", Schema.createUnion(Schema.create(Schema.Type.STRING)), null, null))); - File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); - tmp.deleteOnExit(); - tmp.delete(); - Path file = new Path(tmp.getPath()); + Path file = new Path(createTempFile().getPath()); // Parquet writer ParquetWriter parquetWriter = AvroParquetWriter.builder(file).withSchema(avroSchema) @@ -678,6 +656,46 @@ public class TestReadWrite { assertEquals(str("theValue"), nextRecord.get("value")); } + @Test + public void testDuplicatedValuesWithDictionary() throws Exception { + Schema schema = SchemaBuilder.record("spark_schema") + .fields().optionalBytes("value").endRecord(); + + Path file = new Path(createTempFile().getPath()); + + String[] records = {"one", "two", "three", "three", "two", "one", "zero"}; + try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter + .<GenericData.Record>builder(file) + .withSchema(schema) + .withConf(testConf) + .build()) { + for (String record : records) { + writer.write(new GenericRecordBuilder(schema) + .set("value", record.getBytes()).build()); + } + } + + try (ParquetReader<GenericRecord> reader = AvroParquetReader + .<GenericRecord>builder(file) + .withConf(testConf).build()) { + GenericRecord rec; + int i = 0; + while ((rec = reader.read()) != null) { + ByteBuffer buf = (ByteBuffer) rec.get("value"); + byte[] bytes = new byte[buf.remaining()]; + buf.get(bytes); + assertEquals(records[i++], new String(bytes)); + } + } + } + + private File createTempFile() throws IOException { + File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); + tmp.deleteOnExit(); + tmp.delete(); + return tmp; + } + /** * Return a String or Utf8 depending on whether compatibility is on */