This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch refactor-pinot-reader-reader-dependency
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 21608d4d169417e73e2dc42a43a7c56ec1cdb0f5
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Nov 30 15:43:02 2019 -0800

    Refactor to ensure pinot-orc and pinot-parquet only depends on pinot-spi 
module
---
 .../pinot/core/data/readers/AvroRecordReader.java  |   1 +
 .../pinot/core/data/readers/CSVRecordReader.java   |   1 +
 .../pinot/core/data/readers/JSONRecordReader.java  |   1 +
 .../core/data/readers/ThriftRecordReader.java      |   1 +
 .../java/org/apache/pinot/core/util/AvroUtils.java |  45 +++++++-
 .../core/data/readers/RecordReaderUtilsTest.java   |   1 +
 pinot-orc/pom.xml                                  |   2 +-
 pinot-parquet/pom.xml                              |   9 +-
 .../pinot/parquet/data/readers}/AvroUtils.java     | 113 +++++++++++++++++++--
 .../parquet/data/readers/ParquetRecordReader.java  |   4 +-
 .../pinot/parquet/data/readers/ParquetUtils.java   |   1 +
 .../data/readers/ParquetRecordReaderTest.java      |  30 +++++-
 .../pinot/spi}/data/readers/RecordReaderUtils.java |   6 +-
 13 files changed, 188 insertions(+), 27 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
index a0763fb..7842628 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
@@ -30,6 +30,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.util.AvroUtils;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 
 
 /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
index 97d3f67..29e1db4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
@@ -32,6 +32,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 
 
 /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
index ba92ff6..e2b4ab7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
@@ -30,6 +30,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
index 30b750d..dd5dccc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
@@ -30,6 +30,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TFieldIdEnum;
 import org.apache.thrift.protocol.TBinaryProtocol;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
index 60b28cc..c6a1c36 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
@@ -22,6 +22,8 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -32,6 +34,7 @@ import javax.annotation.Nullable;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
@@ -41,7 +44,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.common.utils.AvroSchemaUtil;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.data.readers.RecordReaderUtils;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -327,7 +330,45 @@ public class AvroUtils {
         return;
       }
     }
+    to.putField(fieldName, RecordReaderUtils.convert(fieldSpec, 
convert(fieldSpec, from.get(fieldName))));
+  }
 
-    to.putField(fieldName, RecordReaderUtils.convert(fieldSpec, 
from.get(fieldName)));
+  /**
+   * Converts the value based on the given field spec.
+   */
+  public static Object convert(FieldSpec fieldSpec, @Nullable Object value) {
+    if (fieldSpec.isSingleValueField()) {
+      return handleSingleValue(value);
+    } else {
+      return handleMultiValue((Collection) value);
+    }
+  }
+
+  /**
+   * Converts the value based on the given field spec.
+   */
+  public static Object handleSingleValue(@Nullable Object value) {
+    if (value == null) {
+      return null;
+    }
+    if (value instanceof GenericData.Record) {
+      return handleSingleValue(((GenericData.Record) value).get(0));
+    }
+    return value;
+  }
+
+  /**
+   * Converts the value based on the given field spec.
+   */
+  public static Object handleMultiValue(@Nullable Collection values) {
+    if (values == null || values.isEmpty()) {
+      return null;
+    }
+    int numValues = values.size();
+    List<Object> list = new ArrayList<>(numValues);
+    for (Object value : values) {
+      list.add(handleSingleValue(value));
+    }
+    return list;
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderUtilsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderUtilsTest.java
index 23fcae8..823ddf6 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderUtilsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderUtilsTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
diff --git a/pinot-orc/pom.xml b/pinot-orc/pom.xml
index 54abdcd..477cdb5 100644
--- a/pinot-orc/pom.xml
+++ b/pinot-orc/pom.xml
@@ -66,7 +66,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-core</artifactId>
+      <artifactId>pinot-spi</artifactId>
     </dependency>
     <dependency>
       <groupId>org.testng</groupId>
diff --git a/pinot-parquet/pom.xml b/pinot-parquet/pom.xml
index 672a371..7939daa 100644
--- a/pinot-parquet/pom.xml
+++ b/pinot-parquet/pom.xml
@@ -45,7 +45,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-core</artifactId>
+      <artifactId>pinot-spi</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.parquet</groupId>
@@ -75,13 +75,6 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-core</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
       <scope>test</scope>
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java 
b/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/AvroUtils.java
similarity index 79%
copy from pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
copy to 
pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/AvroUtils.java
index 60b28cc..85b108f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
+++ 
b/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/AvroUtils.java
@@ -16,12 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.util;
+package org.apache.pinot.parquet.data.readers;
 
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -32,6 +36,7 @@ import javax.annotation.Nullable;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
@@ -39,13 +44,13 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.common.utils.AvroSchemaUtil;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.data.readers.RecordReaderUtils;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
+// Deprecated: this should be deprecated once we have pinot-avro module which 
provides Avro Related Utils
 public class AvroUtils {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(AvroUtils.class);
   public static final String MAP_KEY_COLUMN_SUFFIX = "__KEYS";
@@ -263,9 +268,9 @@ public class AvroUtils {
       org.apache.avro.Schema fieldSchema = 
extractSupportedSchema(field.schema());
       org.apache.avro.Schema.Type fieldType = fieldSchema.getType();
       if (fieldType == org.apache.avro.Schema.Type.ARRAY) {
-        return 
AvroSchemaUtil.valueOf(extractSupportedSchema(fieldSchema.getElementType()).getType());
+        return 
valueOf(extractSupportedSchema(fieldSchema.getElementType()).getType());
       } else {
-        return AvroSchemaUtil.valueOf(fieldType);
+        return valueOf(fieldType);
       }
     } catch (Exception e) {
       throw new RuntimeException("Caught exception while extracting data type 
from field: " + field.name(), e);
@@ -327,7 +332,101 @@ public class AvroUtils {
         return;
       }
     }
+    to.putField(fieldName, RecordReaderUtils.convert(fieldSpec, 
convert(fieldSpec, from.get(fieldName))));
+  }
+
+  /**
+   * Converts the value based on the given field spec.
+   */
+  public static Object convert(FieldSpec fieldSpec, @Nullable Object value) {
+    if (fieldSpec.isSingleValueField()) {
+      return handleSingleValue(value);
+    } else {
+      return handleMultiValue((Collection) value);
+    }
+  }
 
-    to.putField(fieldName, RecordReaderUtils.convert(fieldSpec, 
from.get(fieldName)));
+  /**
+   * Converts the value based on the given field spec.
+   */
+  public static Object handleSingleValue(@Nullable Object value) {
+    if (value == null) {
+      return null;
+    }
+    if (value instanceof GenericData.Record) {
+      return handleSingleValue(((GenericData.Record) value).get(0));
+    }
+    return value;
+  }
+
+  /**
+   * Converts the value based on the given field spec.
+   */
+  public static Object handleMultiValue(@Nullable Collection values) {
+    if (values == null || values.isEmpty()) {
+      return null;
+    }
+    int numValues = values.size();
+    List<Object> list = new ArrayList<>(numValues);
+    for (Object value : values) {
+      list.add(handleSingleValue(value));
+    }
+    return list;
+  }
+
+  /**
+   * Returns the data type stored in Pinot that is associated with the given 
Avro type.
+   */
+  public static FieldSpec.DataType valueOf(org.apache.avro.Schema.Type 
avroType) {
+    switch (avroType) {
+      case INT:
+        return FieldSpec.DataType.INT;
+      case LONG:
+        return FieldSpec.DataType.LONG;
+      case FLOAT:
+        return FieldSpec.DataType.FLOAT;
+      case DOUBLE:
+        return FieldSpec.DataType.DOUBLE;
+      case BOOLEAN:
+      case STRING:
+      case ENUM:
+        return FieldSpec.DataType.STRING;
+      case BYTES:
+        return FieldSpec.DataType.BYTES;
+      default:
+        throw new UnsupportedOperationException("Unsupported Avro type: " + 
avroType);
+    }
+  }
+
+  public static ObjectNode toAvroSchemaJsonObject(FieldSpec fieldSpec) {
+    ObjectNode jsonSchema = JsonUtils.newObjectNode();
+    jsonSchema.put("name", fieldSpec.getName());
+    switch (fieldSpec.getDataType()) {
+      case INT:
+        jsonSchema.set("type", convertStringsToJsonArray("null", "int"));
+        return jsonSchema;
+      case LONG:
+        jsonSchema.set("type", convertStringsToJsonArray("null", "long"));
+        return jsonSchema;
+      case FLOAT:
+        jsonSchema.set("type", convertStringsToJsonArray("null", "float"));
+        return jsonSchema;
+      case DOUBLE:
+        jsonSchema.set("type", convertStringsToJsonArray("null", "double"));
+        return jsonSchema;
+      case STRING:
+        jsonSchema.set("type", convertStringsToJsonArray("null", "string"));
+        return jsonSchema;
+      default:
+        throw new UnsupportedOperationException();
+    }
+  }
+
+  private static ArrayNode convertStringsToJsonArray(String... strings) {
+    ArrayNode jsonArray = JsonUtils.newArrayNode();
+    for (String string : strings) {
+      jsonArray.add(string);
+    }
+    return jsonArray;
   }
 }
diff --git 
a/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetRecordReader.java
 
b/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetRecordReader.java
index fbf0eab..60be28e 100644
--- 
a/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetRecordReader.java
+++ 
b/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetRecordReader.java
@@ -30,8 +30,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
-import org.apache.pinot.core.data.readers.RecordReaderUtils;
-import org.apache.pinot.core.util.AvroUtils;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 
 
 /**
@@ -101,4 +100,5 @@ public class ParquetRecordReader implements RecordReader {
       throws IOException {
     _reader.close();
   }
+
 }
diff --git 
a/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetUtils.java
 
b/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetUtils.java
index 5f1553b..b19694f4 100644
--- 
a/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetUtils.java
+++ 
b/pinot-parquet/src/main/java/org/apache/pinot/parquet/data/readers/ParquetUtils.java
@@ -33,6 +33,7 @@ import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
 
 
 public class ParquetUtils {
diff --git 
a/pinot-parquet/src/test/java/org/apache/pinot/parquet/data/readers/ParquetRecordReaderTest.java
 
b/pinot-parquet/src/test/java/org/apache/pinot/parquet/data/readers/ParquetRecordReaderTest.java
index e14e2c6..9e609f6 100644
--- 
a/pinot-parquet/src/test/java/org/apache/pinot/parquet/data/readers/ParquetRecordReaderTest.java
+++ 
b/pinot-parquet/src/test/java/org/apache/pinot/parquet/data/readers/ParquetRecordReaderTest.java
@@ -27,15 +27,23 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.pinot.core.data.readers.RecordReaderTest;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
-public class ParquetRecordReaderTest extends RecordReaderTest {
+public class ParquetRecordReaderTest {
+  protected static final String[] COLUMNS = {"INT_SV", "INT_MV"};
   private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"ParquetRecordReaderTest");
   private static final File DATA_FILE = new File(TEMP_DIR, "data.parquet");
+  protected static final org.apache.pinot.spi.data.Schema
+      SCHEMA = new 
org.apache.pinot.spi.data.Schema.SchemaBuilder().addMetric(COLUMNS[0], 
FieldSpec.DataType.INT).build();
+  private static final Object[][] RECORDS = {{5, new int[]{10, 15, 20}}, {25, 
new int[]{30, 35, 40}}, {null, null}};
+  private static final Object[] DEFAULT_VALUES = {0, new int[]{-1}};
 
   @BeforeClass
   public void setUp()
@@ -77,6 +85,24 @@ public class ParquetRecordReaderTest extends 
RecordReaderTest {
     }
   }
 
+
+  protected static void checkValue(RecordReader recordReader)
+      throws Exception {
+    for (Object[] expectedRecord : RECORDS) {
+      GenericRow actualRecord = recordReader.next();
+      GenericRow transformedRecord = actualRecord;
+
+      int numColumns = COLUMNS.length;
+      for (int i = 0; i < numColumns; i++) {
+        if (expectedRecord[i] != null) {
+          Assert.assertEquals(transformedRecord.getValue(COLUMNS[i]), 
expectedRecord[i]);
+        } else {
+          Assert.assertEquals(transformedRecord.getValue(COLUMNS[i]), 
DEFAULT_VALUES[i]);
+        }
+      }
+    }
+    Assert.assertFalse(recordReader.hasNext());
+  }
   @Test
   public void testParquetRecordReader()
       throws Exception {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
similarity index 97%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
index 6764780..219b7e5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.data.readers;
+package org.apache.pinot.spi.data.readers;
 
 import com.google.common.base.Preconditions;
 import java.io.BufferedInputStream;
@@ -34,7 +34,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.zip.GZIPInputStream;
 import javax.annotation.Nullable;
-import org.apache.avro.generic.GenericData;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
@@ -113,9 +112,6 @@ public class RecordReaderUtils {
     if (value == null) {
       return null;
     }
-    if (value instanceof GenericData.Record) {
-      return convertSingleValue(fieldSpec, ((GenericData.Record) 
value).get(0));
-    }
     DataType dataType = fieldSpec.getDataType();
     if (dataType == FieldSpec.DataType.BYTES) {
       // Avro ByteBuffer maps to byte[]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to