This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 542ab3e PARQUET-1407: Avro: Fix binary values returned from
dictionary encoding (#552)
542ab3e is described below
commit 542ab3e2b321d5f755f3e9c6b997a458f8cf0f5e
Author: nandorKollar <[email protected]>
AuthorDate: Mon Nov 19 23:07:55 2018 +0100
PARQUET-1407: Avro: Fix binary values returned from dictionary encoding
(#552)
* PARQUET-1407: Add test case for PARQUET-1407 to demonstrate the issue
* PARQUET-1407: Fix binary values from dictionary encoding.
Closes #551.
---
.../org/apache/parquet/avro/AvroConverters.java | 11 ++-
.../org/apache/parquet/avro/TestReadWrite.java | 84 +++++++++++++---------
2 files changed, 61 insertions(+), 34 deletions(-)
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
index 817f074..cc49cc2 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
@@ -76,9 +76,13 @@ public class AvroConverters {
}
}
+ public T prepareDictionaryValue(T value) {
+ return value;
+ }
+
@Override
public void addValueFromDictionary(int dictionaryId) {
- parent.add(dict[dictionaryId]);
+ parent.add(prepareDictionaryValue(dict[dictionaryId]));
}
}
@@ -220,6 +224,11 @@ public class AvroConverters {
public ByteBuffer convert(Binary binary) {
return ByteBuffer.wrap(binary.getBytes());
}
+
+ @Override
+ public ByteBuffer prepareDictionaryValue(ByteBuffer value) {
+ return value.duplicate();
+ }
}
static final class FieldStringConverter extends BinaryConverter<String> {
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 2335e36..69a73cb 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import java.io.File;
+import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ import java.util.Random;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
@@ -87,10 +89,7 @@ public class TestReadWrite {
Schema schema = new Schema.Parser().parse(
Resources.getResource("array.avsc").openStream());
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
@@ -117,10 +116,7 @@ public class TestReadWrite {
Schema schema = new Schema.Parser().parse(
Resources.getResource("map.avsc").openStream());
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
@@ -147,10 +143,7 @@ public class TestReadWrite {
Schema schema = new Schema.Parser().parse(
Resources.getResource("map_with_nulls.avsc").openStream());
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
@@ -182,10 +175,7 @@ public class TestReadWrite {
schema.setFields(Lists.newArrayList(
new Schema.Field("mymap",
Schema.createMap(Schema.create(Schema.Type.INT)), null, null)));
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
@@ -209,10 +199,7 @@ public class TestReadWrite {
Schema schema = new Schema.Parser().parse(
Resources.getResource("map.avsc").openStream());
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
@@ -346,11 +333,8 @@ public class TestReadWrite {
Schema schema = new Schema.Parser().parse(
Resources.getResource("all.avsc").openStream());
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
-
+ Path file = new Path(createTempFile().getPath());
+
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
.withSchema(schema)
@@ -429,10 +413,7 @@ public class TestReadWrite {
@Test
public void testAllUsingDefaultAvroSchema() throws Exception {
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
// write file using Parquet APIs
ParquetWriter<Map<String, Object>> parquetWriter = new
ParquetWriter<Map<String, Object>>(file,
@@ -654,10 +635,7 @@ public class TestReadWrite {
Collections.singletonList(new Schema.Field("value",
Schema.createUnion(Schema.create(Schema.Type.STRING)), null, null)));
- File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
- tmp.deleteOnExit();
- tmp.delete();
- Path file = new Path(tmp.getPath());
+ Path file = new Path(createTempFile().getPath());
// Parquet writer
ParquetWriter parquetWriter =
AvroParquetWriter.builder(file).withSchema(avroSchema)
@@ -678,6 +656,46 @@ public class TestReadWrite {
assertEquals(str("theValue"), nextRecord.get("value"));
}
+ @Test
+ public void testDuplicatedValuesWithDictionary() throws Exception {
+ Schema schema = SchemaBuilder.record("spark_schema")
+ .fields().optionalBytes("value").endRecord();
+
+ Path file = new Path(createTempFile().getPath());
+
+ String[] records = {"one", "two", "three", "three", "two", "one", "zero"};
+ try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter
+ .<GenericData.Record>builder(file)
+ .withSchema(schema)
+ .withConf(testConf)
+ .build()) {
+ for (String record : records) {
+ writer.write(new GenericRecordBuilder(schema)
+ .set("value", record.getBytes()).build());
+ }
+ }
+
+ try (ParquetReader<GenericRecord> reader = AvroParquetReader
+ .<GenericRecord>builder(file)
+ .withConf(testConf).build()) {
+ GenericRecord rec;
+ int i = 0;
+ while ((rec = reader.read()) != null) {
+ ByteBuffer buf = (ByteBuffer) rec.get("value");
+ byte[] bytes = new byte[buf.remaining()];
+ buf.get(bytes);
+ assertEquals(records[i++], new String(bytes));
+ }
+ }
+ }
+
+ private File createTempFile() throws IOException {
+ File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+ tmp.deleteOnExit();
+ tmp.delete();
+ return tmp;
+ }
+
/**
* Return a String or Utf8 depending on whether compatibility is on
*/