This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch refactor-pinot-reader-reader-dependency in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 21608d4d169417e73e2dc42a43a7c56ec1cdb0f5 Author: Xiang Fu <[email protected]> AuthorDate: Sat Nov 30 15:43:02 2019 -0800 Refactor to ensure pinot-orc and pinot-parquet only depends on pinot-spi module --- .../pinot/core/data/readers/AvroRecordReader.java | 1 + .../pinot/core/data/readers/CSVRecordReader.java | 1 + .../pinot/core/data/readers/JSONRecordReader.java | 1 + .../core/data/readers/ThriftRecordReader.java | 1 + .../java/org/apache/pinot/core/util/AvroUtils.java | 45 +++++++- .../core/data/readers/RecordReaderUtilsTest.java | 1 + pinot-orc/pom.xml | 2 +- pinot-parquet/pom.xml | 9 +- .../pinot/parquet/data/readers}/AvroUtils.java | 113 +++++++++++++++++++-- .../parquet/data/readers/ParquetRecordReader.java | 4 +- .../pinot/parquet/data/readers/ParquetUtils.java | 1 + .../data/readers/ParquetRecordReaderTest.java | 30 +++++- .../pinot/spi}/data/readers/RecordReaderUtils.java | 6 +- 13 files changed, 188 insertions(+), 27 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java index a0763fb..7842628 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java @@ -30,6 +30,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.core.util.AvroUtils; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java index 97d3f67..29e1db4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java @@ -32,6 +32,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java index ba92ff6..e2b4ab7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java @@ -30,6 +30,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; import org.apache.pinot.spi.utils.JsonUtils; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java index 30b750d..dd5dccc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java @@ -30,6 +30,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; import org.apache.thrift.TBase; import org.apache.thrift.TFieldIdEnum; import org.apache.thrift.protocol.TBinaryProtocol; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java index 60b28cc..c6a1c36 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java @@ -22,6 +22,8 @@ import com.google.common.base.Preconditions; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -32,6 +34,7 @@ import javax.annotation.Nullable; import org.apache.avro.Schema.Field; import org.apache.avro.SchemaBuilder; import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.pinot.spi.data.DimensionFieldSpec; @@ -41,7 +44,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.TimeFieldSpec; import org.apache.pinot.common.utils.AvroSchemaUtil; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.core.data.readers.RecordReaderUtils; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -327,7 +330,45 @@ public class AvroUtils { return; } } + to.putField(fieldName, RecordReaderUtils.convert(fieldSpec, convert(fieldSpec, from.get(fieldName)))); + } - to.putField(fieldName, RecordReaderUtils.convert(fieldSpec, from.get(fieldName))); + /** + * Converts the value based on the given field spec. + */ + public static Object convert(FieldSpec fieldSpec, @Nullable Object value) { + if (fieldSpec.isSingleValueField()) { + return handleSingleValue(value); + } else { + return handleMultiValue((Collection) value); + } + } + + /** + * Converts the value based on the given field spec. + */ + public static Object handleSingleValue(@Nullable Object value) { + if (value == null) { + return null; + } + if (value instanceof GenericData.Record) { + return handleSingleValue(((GenericData.Record) value).get(0)); + } + return value; + } + + /** + * Converts the value based on the given field spec. + */ + public static Object handleMultiValue(@Nullable Collection values) { + if (values == null || values.isEmpty()) { + return null; + } + int numValues = values.size(); + List<Object> list = new ArrayList<>(numValues); + for (Object value : values) { + list.add(handleSingleValue(value)); + } + return list; } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderUtilsTest.java index 23fcae8..823ddf6 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderUtilsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderUtilsTest.java @@ -29,6 +29,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.TimeFieldSpec; import org.apache.pinot.spi.data.TimeGranularitySpec; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; diff --git a/pinot-orc/pom.xml b/pinot-orc/pom.xml index 54abdcd..477cdb5 100644 --- a/pinot-orc/pom.xml +++ b/pinot-orc/pom.xml @@ -66,7 +66,7 @@ </dependency> <dependency> <groupId>org.apache.pinot</groupId> - <artifactId>pinot-core</artifactId> + <artifactId>pinot-spi</artifactId> </dependency> <dependency> <groupId>org.testng</groupId> diff --git a/pinot-parquet/pom.xml b/pinot-parquet/pom.xml index 672a371..7939daa 100644 --- a/pinot-parquet/pom.xml +++ b/pinot-parquet/pom.xml @@ -45,7 +45,7 @@ <dependencies> <dependency> <groupId>org.apache.pinot</groupId> - <artifactId>pinot-core</artifactId> + <artifactId>pinot-spi</artifactId> </dependency> <dependency> <groupId>org.apache.parquet</groupId> @@ -75,13 +75,6 @@ </exclusions> </dependency> <dependency> - <groupId>org.apache.pinot</groupId> - <artifactId>pinot-core</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <scope>test</scope> diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java b/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/AvroUtils.java similarity index 79% copy from pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java copy to pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/AvroUtils.java index 60b28cc..85b108f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java +++ b/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/AvroUtils.java @@ -16,12 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.util; +package org.apache.pinot.parquet.data.readers; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -32,6 +36,7 @@ import javax.annotation.Nullable; import org.apache.avro.Schema.Field; import org.apache.avro.SchemaBuilder; import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.pinot.spi.data.DimensionFieldSpec; @@ -39,13 +44,13 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.TimeFieldSpec; -import org.apache.pinot.common.utils.AvroSchemaUtil; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.core.data.readers.RecordReaderUtils; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; +import org.apache.pinot.spi.utils.JsonUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +// Deprecated: this should be deprecated once we have pinot-avro module which provides Avro Related Utils public class AvroUtils { private static final Logger LOGGER = LoggerFactory.getLogger(AvroUtils.class); public static final String MAP_KEY_COLUMN_SUFFIX = "__KEYS"; @@ -263,9 +268,9 @@ public class AvroUtils { org.apache.avro.Schema fieldSchema = extractSupportedSchema(field.schema()); org.apache.avro.Schema.Type fieldType = fieldSchema.getType(); if (fieldType == org.apache.avro.Schema.Type.ARRAY) { - return AvroSchemaUtil.valueOf(extractSupportedSchema(fieldSchema.getElementType()).getType()); + return valueOf(extractSupportedSchema(fieldSchema.getElementType()).getType()); } else { - return AvroSchemaUtil.valueOf(fieldType); + return valueOf(fieldType); } } catch (Exception e) { throw new RuntimeException("Caught exception while extracting data type from field: " + field.name(), e); @@ -327,7 +332,101 @@ public class AvroUtils { return; } } + to.putField(fieldName, RecordReaderUtils.convert(fieldSpec, convert(fieldSpec, from.get(fieldName)))); + } + + /** + * Converts the value based on the given field spec. + */ + public static Object convert(FieldSpec fieldSpec, @Nullable Object value) { + if (fieldSpec.isSingleValueField()) { + return handleSingleValue(value); + } else { + return handleMultiValue((Collection) value); + } + } - to.putField(fieldName, RecordReaderUtils.convert(fieldSpec, from.get(fieldName))); + /** + * Converts the value based on the given field spec. + */ + public static Object handleSingleValue(@Nullable Object value) { + if (value == null) { + return null; + } + if (value instanceof GenericData.Record) { + return handleSingleValue(((GenericData.Record) value).get(0)); + } + return value; + } + + /** + * Converts the value based on the given field spec. + */ + public static Object handleMultiValue(@Nullable Collection values) { + if (values == null || values.isEmpty()) { + return null; + } + int numValues = values.size(); + List<Object> list = new ArrayList<>(numValues); + for (Object value : values) { + list.add(handleSingleValue(value)); + } + return list; + } + + /** + * Returns the data type stored in Pinot that is associated with the given Avro type. + */ + public static FieldSpec.DataType valueOf(org.apache.avro.Schema.Type avroType) { + switch (avroType) { + case INT: + return FieldSpec.DataType.INT; + case LONG: + return FieldSpec.DataType.LONG; + case FLOAT: + return FieldSpec.DataType.FLOAT; + case DOUBLE: + return FieldSpec.DataType.DOUBLE; + case BOOLEAN: + case STRING: + case ENUM: + return FieldSpec.DataType.STRING; + case BYTES: + return FieldSpec.DataType.BYTES; + default: + throw new UnsupportedOperationException("Unsupported Avro type: " + avroType); + } + } + + public static ObjectNode toAvroSchemaJsonObject(FieldSpec fieldSpec) { + ObjectNode jsonSchema = JsonUtils.newObjectNode(); + jsonSchema.put("name", fieldSpec.getName()); + switch (fieldSpec.getDataType()) { + case INT: + jsonSchema.set("type", convertStringsToJsonArray("null", "int")); + return jsonSchema; + case LONG: + jsonSchema.set("type", convertStringsToJsonArray("null", "long")); + return jsonSchema; + case FLOAT: + jsonSchema.set("type", convertStringsToJsonArray("null", "float")); + return jsonSchema; + case DOUBLE: + jsonSchema.set("type", convertStringsToJsonArray("null", "double")); + return jsonSchema; + case STRING: + jsonSchema.set("type", convertStringsToJsonArray("null", "string")); + return jsonSchema; + default: + throw new UnsupportedOperationException(); + } + } + + private static ArrayNode convertStringsToJsonArray(String... strings) { + ArrayNode jsonArray = JsonUtils.newArrayNode(); + for (String string : strings) { + jsonArray.add(string); + } + return jsonArray; } } diff --git a/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetRecordReader.java b/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetRecordReader.java index fbf0eab..60be28e 100644 --- a/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetRecordReader.java +++ b/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetRecordReader.java @@ -30,8 +30,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; -import org.apache.pinot.core.data.readers.RecordReaderUtils; -import org.apache.pinot.core.util.AvroUtils; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; /** @@ -101,4 +100,5 @@ public class ParquetRecordReader implements RecordReader { throws IOException { _reader.close(); } + } diff --git a/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetUtils.java b/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetUtils.java index 5f1553b..b19694f4 100644 --- a/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetUtils.java +++ b/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetUtils.java @@ -33,6 +33,7 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.pinot.spi.data.FieldSpec; public class ParquetUtils { diff --git a/pinot-parquet/src/test/java/org/apache/pinot/parquet/data/readers/ParquetRecordReaderTest.java b/pinot-parquet/src/test/java/org/apache/pinot/parquet/data/readers/ParquetRecordReaderTest.java index e14e2c6..9e609f6 100644 --- a/pinot-parquet/src/test/java/org/apache/pinot/parquet/data/readers/ParquetRecordReaderTest.java +++ b/pinot-parquet/src/test/java/org/apache/pinot/parquet/data/readers/ParquetRecordReaderTest.java @@ -27,15 +27,23 @@ import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.pinot.core.data.readers.RecordReaderTest; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -public class ParquetRecordReaderTest extends RecordReaderTest { +public class ParquetRecordReaderTest { + protected static final String[] COLUMNS = {"INT_SV", "INT_MV"}; private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "ParquetRecordReaderTest"); private static final File DATA_FILE = new File(TEMP_DIR, "data.parquet"); + protected static final org.apache.pinot.spi.data.Schema + SCHEMA = new org.apache.pinot.spi.data.Schema.SchemaBuilder().addMetric(COLUMNS[0], FieldSpec.DataType.INT).build(); + private static final Object[][] RECORDS = {{5, new int[]{10, 15, 20}}, {25, new int[]{30, 35, 40}}, {null, null}}; + private static final Object[] DEFAULT_VALUES = {0, new int[]{-1}}; @BeforeClass public void setUp() @@ -77,6 +85,24 @@ public class ParquetRecordReaderTest extends RecordReaderTest { } } + + protected static void checkValue(RecordReader recordReader) + throws Exception { + for (Object[] expectedRecord : RECORDS) { + GenericRow actualRecord = recordReader.next(); + GenericRow transformedRecord = actualRecord; + + int numColumns = COLUMNS.length; + for (int i = 0; i < numColumns; i++) { + if (expectedRecord[i] != null) { + Assert.assertEquals(transformedRecord.getValue(COLUMNS[i]), expectedRecord[i]); + } else { + Assert.assertEquals(transformedRecord.getValue(COLUMNS[i]), DEFAULT_VALUES[i]); + } + } + } + Assert.assertFalse(recordReader.hasNext()); + } @Test public void testParquetRecordReader() throws Exception { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java similarity index 97% rename from pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java index 6764780..219b7e5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.data.readers; +package org.apache.pinot.spi.data.readers; import com.google.common.base.Preconditions; import java.io.BufferedInputStream; @@ -34,7 +34,6 @@ import java.util.Collection; import java.util.List; import java.util.zip.GZIPInputStream; import javax.annotation.Nullable; -import org.apache.avro.generic.GenericData; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; @@ -113,9 +112,6 @@ public class RecordReaderUtils { if (value == null) { return null; } - if (value instanceof GenericData.Record) { - return convertSingleValue(fieldSpec, ((GenericData.Record) value).get(0)); - } DataType dataType = fieldSpec.getDataType(); if (dataType == FieldSpec.DataType.BYTES) { // Avro ByteBuffer maps to byte[] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
