This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d54b04a Deep extraction in Avro and Json RecordExtractor (#5492)
d54b04a is described below
commit d54b04a2562f86dfb3adaa02ff400951d8108738
Author: Neha Pawar <[email protected]>
AuthorDate: Fri Jun 5 18:05:29 2020 -0700
Deep extraction in Avro and Json RecordExtractor (#5492)
- Json handles Map/Collection/single-values
- Avro handles Map/Collection/GenericData.Record/single-values
- Added json_format function to convert json object to json string
---
.../pinot/common/function/JsonFunctions.java | 9 +
...valuatorTest.java => InbuiltFunctionsTest.java} | 59 +++++-
.../src/test/resources/data/test_data-mv.avro | Bin 9583323 -> 7700192
bytes
.../inputformat/avro/AvroRecordExtractor.java | 3 +-
.../pinot/plugin/inputformat/avro/AvroUtils.java | 67 ++++---
.../avro/AvroRecordExtractorComplexTypesTest.java | 209 +++++++++++++++++++++
.../avro/AvroRecordExtractorMapTypeTest.java | 115 ------------
.../inputformat/json/JSONRecordExtractor.java | 54 +-----
...xtractor.java => JSONRecordExtractorUtils.java} | 90 ++++-----
.../json/JSONRecordExtractorUtilsTest.java | 120 ++++++++++++
.../src/test/resources/data/test_data-mv.avro | Bin 9583323 -> 7700192
bytes
.../pinot/spi/data/readers/RecordExtractor.java | 3 +
.../data/readers/AbstractRecordExtractorTest.java | 47 +++--
13 files changed, 514 insertions(+), 262 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/JsonFunctions.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/JsonFunctions.java
index 1e5cd75..e0d3b06 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/function/JsonFunctions.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/JsonFunctions.java
@@ -46,4 +46,13 @@ public class JsonFunctions {
return JsonUtils.objectToString(map);
}
+ /**
+ * Convert object to Json String
+ */
+ @ScalarFunction
+ static String json_format(Object object)
+ throws JsonProcessingException {
+ return JsonUtils.objectToString(object);
+ }
+
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionEvaluatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionsTest.java
similarity index 77%
rename from
pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionEvaluatorTest.java
rename to
pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionsTest.java
index 9de5956..9532ea8 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionEvaluatorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionsTest.java
@@ -19,19 +19,21 @@
package org.apache.pinot.core.data.function;
import com.google.common.collect.Lists;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.pinot.common.function.DateTimeFunctions;
+import java.util.Map;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
- * Tests the Pinot inbuilt transform functions in {@link DateTimeFunctions}
which perform date time conversion
+ * Tests the Pinot inbuilt transform functions
*/
-public class DateTimeFunctionEvaluatorTest {
+public class InbuiltFunctionsTest {
@Test(dataProvider = "dateTimeFunctionsTestDataProvider")
public void testDateTimeTransformFunctions(String transformFunction,
List<String> arguments, GenericRow row,
@@ -206,4 +208,55 @@ public class DateTimeFunctionEvaluatorTest {
return inputs.toArray(new Object[0][]);
}
+
+ @Test(dataProvider = "jsonFunctionDataProvider")
+ public void testJsonFunctions(String transformFunction, List<String>
arguments, GenericRow row, Object result)
+ throws Exception {
+ InbuiltFunctionEvaluator evaluator = new
InbuiltFunctionEvaluator(transformFunction);
+ Assert.assertEquals(evaluator.getArguments(), arguments);
+ Assert.assertEquals(evaluator.evaluate(row), result);
+ }
+
+ @DataProvider(name = "jsonFunctionDataProvider")
+ public Object[][] jsonFunctionsDataProvider()
+ throws IOException {
+ List<Object[]> inputs = new ArrayList<>();
+
+ // toJsonMapStr
+ GenericRow row0 = new GenericRow();
+ String jsonStr = "{\"k1\":\"foo\",\"k2\":\"bar\"}";
+ row0.putValue("jsonMap", JsonUtils.stringToObject(jsonStr, Map.class));
+ inputs.add(new Object[]{"toJsonMapStr(jsonMap)",
Lists.newArrayList("jsonMap"), row0, jsonStr});
+
+ GenericRow row1 = new GenericRow();
+ jsonStr =
"{\"k3\":{\"sub1\":10,\"sub2\":1.0},\"k4\":\"baz\",\"k5\":[1,2,3]}";
+ row1.putValue("jsonMap", JsonUtils
+ .stringToObject(jsonStr, Map.class));
+ inputs.add(new Object[]{"toJsonMapStr(jsonMap)",
Lists.newArrayList("jsonMap"), row1, jsonStr});
+
+ GenericRow row2 = new GenericRow();
+ jsonStr = "{\"k1\":\"foo\",\"k2\":\"bar\"}";
+ row2.putValue("jsonMap", JsonUtils.stringToObject(jsonStr, Map.class));
+ inputs.add(new Object[]{"json_format(jsonMap)",
Lists.newArrayList("jsonMap"), row2, jsonStr});
+
+ GenericRow row3 = new GenericRow();
+ jsonStr =
"{\"k3\":{\"sub1\":10,\"sub2\":1.0},\"k4\":\"baz\",\"k5\":[1,2,3]}";
+ row3.putValue("jsonMap", JsonUtils
+ .stringToObject(jsonStr, Map.class));
+ inputs.add(new Object[]{"json_format(jsonMap)",
Lists.newArrayList("jsonMap"), row3, jsonStr});
+
+ GenericRow row4 = new GenericRow();
+ jsonStr = "[{\"one\":1,\"two\":\"too\"},{\"one\":11,\"two\":\"roo\"}]";
+ row4.putValue("jsonMap", JsonUtils
+ .stringToObject(jsonStr, List.class));
+ inputs.add(new Object[]{"json_format(jsonMap)",
Lists.newArrayList("jsonMap"), row4, jsonStr});
+
+ GenericRow row5 = new GenericRow();
+ jsonStr =
"[{\"one\":1,\"two\":{\"sub1\":1.1,\"sub2\":1.2},\"three\":[\"a\",\"b\"]},{\"one\":11,\"two\":{\"sub1\":11.1,\"sub2\":11.2},\"three\":[\"aa\",\"bb\"]}]";
+ row5.putValue("jsonMap", JsonUtils
+ .stringToObject(jsonStr, List.class));
+ inputs.add(new Object[]{"json_format(jsonMap)",
Lists.newArrayList("jsonMap"), row5, jsonStr});
+
+ return inputs.toArray(new Object[0][]);
+ }
}
diff --git a/pinot-core/src/test/resources/data/test_data-mv.avro
b/pinot-core/src/test/resources/data/test_data-mv.avro
index 151e091..aff4520 100644
Binary files a/pinot-core/src/test/resources/data/test_data-mv.avro and
b/pinot-core/src/test/resources/data/test_data-mv.avro differ
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
index b9ac3cd..339ab67 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
@@ -25,7 +25,6 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractor;
import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
-import org.apache.pinot.spi.data.readers.RecordReaderUtils;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -48,7 +47,7 @@ public class AvroRecordExtractor implements
RecordExtractor<GenericRecord> {
public GenericRow extract(GenericRecord from, GenericRow to) {
if (_extractAll) {
Map<String, Object> jsonMap = JsonUtils.genericRecordToJson(from);
- jsonMap.forEach((fieldName, value) -> to.putValue(fieldName,
RecordReaderUtils.convert(value)));
+ jsonMap.forEach((fieldName, value) -> to.putValue(fieldName,
AvroUtils.convert(value)));
} else {
for (String fieldName : _fields) {
Object value = from.get(fieldName);
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
index f4ac41c..c089bfc 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -284,59 +285,83 @@ public class AvroUtils {
}
/**
- * Converts the value to a single-valued value or a multi-valued value
+ * Converts the value to either a single value (string, number, bytebuffer),
multi value (Object[]) or a Map
+ *
+ * Natively Pinot only understands single values and multi values.
+ * Map is useful only if some ingestion transform functions operates on it
in the transformation layer
*/
public static Object convert(Object value) {
+ if (value == null) {
+ return null;
+ }
Object convertedValue;
if (value instanceof Collection) {
convertedValue = handleMultiValue((Collection) value);
} else if (value instanceof Map) {
convertedValue = handleMap((Map) value);
+ } else if(value instanceof GenericData.Record) {
+ convertedValue = handleGenericRecord((GenericData.Record) value);
} else {
- convertedValue = handleSingleValue(value);
+ convertedValue = RecordReaderUtils.convertSingleValue(value);
}
return convertedValue;
}
/**
- * Converts the value to a single-valued value by handling instance of
ByteBuffer, Number and String
+ * Handles the conversion of each value of the Collection.
+ * Converts the Collection to an Object array
*/
- public static Object handleSingleValue(@Nullable Object value) {
- if (value == null) {
+ public static Object handleMultiValue(Collection values) {
+
+ if (values.isEmpty()) {
return null;
}
- if (value instanceof GenericData.Record) {
- return handleSingleValue(((GenericData.Record) value).get(0));
+ int numValues = values.size();
+ Object[] array = new Object[numValues];
+ int index = 0;
+ for (Object value : values) {
+ Object convertedValue = convert(value);
+ if (convertedValue != null && !convertedValue.toString().equals("")) {
+ array[index++] = convertedValue;
+ }
+ }
+ if (index == numValues) {
+ return array;
+ } else if (index == 0) {
+ return null;
+ } else {
+ return Arrays.copyOf(array, index);
}
- return RecordReaderUtils.convertSingleValue(value);
}
/**
- * Converts the value to a multi-valued column
+ * Handles the conversion of every value of the Map
*/
- public static Object handleMultiValue(@Nullable Collection values) {
- if (values == null || values.isEmpty()) {
+ public static Object handleMap(Map map) {
+ if (map.isEmpty()) {
return null;
}
- int numValues = values.size();
- List<Object> list = new ArrayList<>(numValues);
- for (Object value : values) {
- list.add(handleSingleValue(value));
+
+ Map<Object, Object> convertedMap = new HashMap<>();
+ for (Object key : map.keySet()) {
+ convertedMap.put(RecordReaderUtils.convertSingleValue(key),
convert(map.get(key)));
}
- return RecordReaderUtils.convertMultiValue(list);
+ return convertedMap;
}
/**
- * Converts the values within the map to single-valued values
+ * Handles the conversion of every field of the GenericRecord
*/
- public static Object handleMap(@Nullable Map map) {
- if (map == null || map.isEmpty()) {
+ private static Object handleGenericRecord(GenericData.Record record) {
+ List<Field> fields = record.getSchema().getFields();
+ if (fields.isEmpty()) {
return null;
}
Map<Object, Object> convertedMap = new HashMap<>();
- for (Object key : map.keySet()) {
- convertedMap.put(RecordReaderUtils.convertSingleValue(key),
RecordReaderUtils.convertSingleValue(map.get(key)));
+ for (Field field : fields) {
+ String fieldName = field.name();
+ convertedMap.put(fieldName, convert(record.get(fieldName)));
}
return convertedMap;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorComplexTypesTest.java
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorComplexTypesTest.java
new file mode 100644
index 0000000..d7584ed
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorComplexTypesTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.avro;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest;
+import org.apache.pinot.spi.data.readers.RecordReader;
+
+import static org.apache.avro.Schema.*;
+
+
+/**
+ * Tests the {@link AvroRecordExtractor} for Avro Map and Avro Record
+ */
+public class AvroRecordExtractorComplexTypesTest extends
AbstractRecordExtractorTest {
+
+ private final File _dataFile = new File(_tempDir, "complex.avro");
+ Schema avroSchema;
+ Schema intStringMapAvroSchema;
+ Schema stringIntMapAvroSchema;
+ Schema simpleRecordSchema;
+ Schema complexRecordSchema;
+ Schema complexFieldSchema;
+ Schema complexListSchema;
+
+ @Override
+ protected List<Map<String, Object>> getInputRecords() {
+
+ // map with int keys
+ intStringMapAvroSchema = createMap(create(Type.STRING));
+
+ // map with string keys
+ stringIntMapAvroSchema = createMap(create(Type.INT));
+
+ // simple record - contains a string, long and double array
+ simpleRecordSchema = createRecord("simpleRecord", null, null, false);
+ simpleRecordSchema.setFields(Lists.newArrayList(new Field("simpleField1",
create(Type.STRING), null, null),
+ new Field("simpleField2", create(Type.LONG), null, null),
+ new Field("simpleList", createArray(create(Type.DOUBLE)), null,
null)));
+
+ // complex record - contains a string, a complex field (contains int and
long)
+ complexRecordSchema = createRecord("complexRecord", null, null, false);
+ complexFieldSchema = createRecord("complexField", null, null, false);
+ complexFieldSchema.setFields(Lists.newArrayList(new Field("field1",
create(Type.INT), null, null),
+ new Field("field2", create(Type.LONG), null, null)));
+ complexRecordSchema.setFields(Lists.newArrayList(new Field("simpleField",
create(Type.STRING), null, null),
+ new Field("complexField", complexFieldSchema, null, null)));
+
+ // complex list element - each element contains a record of int and long
+ complexListSchema = createRecord("complexList", null, null, false);
+ complexListSchema.setFields(Lists.newArrayList(new Field("field1",
create(Type.INT), null, null),
+ new Field("field2", create(Type.LONG), null, null)));
+
+ Field map1Field = new Field("map1", intStringMapAvroSchema, null, null);
+ Field map2Field = new Field("map2", stringIntMapAvroSchema, null, null);
+ Field simpleRecordField = new Field("simpleRecord", simpleRecordSchema,
null, null);
+ Field complexRecordField = new Field("complexRecord", complexRecordSchema,
null, null);
+ Field complexListField = new Field("complexList",
createArray(complexListSchema), null, null);
+
+ avroSchema = createRecord("manyComplexTypes", null, null, false);
+ avroSchema
+ .setFields(Lists.newArrayList(map1Field, map2Field, simpleRecordField,
complexRecordField, complexListField));
+
+ List<Map<String, Object>> inputRecords = new ArrayList<>(2);
+ inputRecords.add(getRecord1());
+ inputRecords.add(getRecord2());
+ return inputRecords;
+ }
+
+ private Map<String, Object> getRecord1() {
+ Map<String, Object> record1 = new HashMap<>();
+
+ Map<Integer, String> map1 = new HashMap<>();
+ map1.put(30, "foo");
+ map1.put(200, "bar");
+ record1.put("map1", map1);
+
+ Map<String, Integer> map2 = new HashMap<>();
+ map2.put("k1", 10000);
+ map2.put("k2", 20000);
+ record1.put("map2", map2);
+
+ GenericRecord simpleRecord = new GenericData.Record(simpleRecordSchema);
+ simpleRecord.put("simpleField1", "foo");
+ simpleRecord.put("simpleField2", 1588469340000L);
+ simpleRecord.put("simpleList", Arrays.asList(1.1, 2.2));
+ record1.put("simpleRecord", simpleRecord);
+
+ GenericRecord complexRecord = new GenericData.Record(complexRecordSchema);
+ GenericRecord subComplexRecord = new
GenericData.Record(complexFieldSchema);
+ subComplexRecord.put("field1", 100);
+ subComplexRecord.put("field2", 1588469340000L);
+ complexRecord.put("simpleField", "foo");
+ complexRecord.put("complexField", subComplexRecord);
+ record1.put("complexRecord", complexRecord);
+
+ GenericRecord listElem1 = new GenericData.Record(complexListSchema);
+ listElem1.put("field1", 20);
+ listElem1.put("field2", 2000200020002000L);
+ GenericRecord listElem2 = new GenericData.Record(complexListSchema);
+ listElem2.put("field1", 280);
+ listElem2.put("field2", 8000200020002000L);
+ record1.put("complexList", Arrays.asList(listElem1, listElem2));
+
+ return record1;
+ }
+
+ private Map<String, Object> getRecord2() {
+ Map<String, Object> record2 = new HashMap<>();
+ Map<Integer, String> map1 = new HashMap<>();
+ map1.put(30, "moo");
+ map1.put(200, "baz");
+ record2.put("map1", map1);
+
+ Map<String, Integer> map2 = new HashMap<>();
+ map2.put("k1", 100);
+ map2.put("k2", 200);
+ record2.put("map2", map2);
+
+ GenericRecord simpleRecord2 = new GenericData.Record(simpleRecordSchema);
+ simpleRecord2.put("simpleField1", "foo");
+ simpleRecord2.put("simpleField2", 1588469340000L);
+ simpleRecord2.put("simpleList", Arrays.asList(1.1, 2.2));
+ record2.put("simpleRecord", simpleRecord2);
+
+ GenericRecord complexRecord2 = new GenericData.Record(complexRecordSchema);
+ GenericRecord subComplexRecord2 = new
GenericData.Record(complexFieldSchema);
+ subComplexRecord2.put("field1", 100);
+ subComplexRecord2.put("field2", 1588469340000L);
+ complexRecord2.put("simpleField", "foo");
+ complexRecord2.put("complexField", subComplexRecord2);
+ record2.put("complexRecord", complexRecord2);
+
+ GenericRecord listElem12 = new GenericData.Record(complexListSchema);
+ listElem12.put("field1", 20);
+ listElem12.put("field2", 2000200020002000L);
+ GenericRecord listElem22 = new GenericData.Record(complexListSchema);
+ listElem22.put("field1", 280);
+ listElem22.put("field2", 8000200020002000L);
+ record2.put("complexList", Arrays.asList(listElem12, listElem22));
+
+ return record2;
+ }
+
+ @Override
+ protected Set<String> getSourceFields() {
+ return Sets.newHashSet("map1", "map2", "simpleRecord", "complexRecord",
"complexList");
+ }
+
+ /**
+ * Create an AvroRecordReader
+ */
+ @Override
+ protected RecordReader createRecordReader(Set<String> fieldsToRead)
+ throws IOException {
+ AvroRecordReader avroRecordReader = new AvroRecordReader();
+ avroRecordReader.init(_dataFile, fieldsToRead, null);
+ return avroRecordReader;
+ }
+
+ /**
+ * Create an Avro input file using the input records containing maps and
record
+ */
+ @Override
+ protected void createInputFile()
+ throws IOException {
+
+ try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+ fileWriter.create(avroSchema, _dataFile);
+ for (Map<String, Object> inputRecord : _inputRecords) {
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ for (String columnName : _sourceFieldNames) {
+ record.put(columnName, inputRecord.get(columnName));
+ }
+ fileWriter.append(record);
+ }
+ }
+ }
+}
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
deleted file mode 100644
index 552c011..0000000
---
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.inputformat.avro;
-
-import com.google.common.collect.Sets;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest;
-import org.apache.pinot.spi.data.readers.RecordReader;
-
-import static org.apache.avro.Schema.*;
-
-
-/**
- * Tests the {@link AvroRecordExtractor} using a schema containing groovy
transform functions for Avro maps
- */
-public class AvroRecordExtractorMapTypeTest extends
AbstractRecordExtractorTest {
-
- private final File _dataFile = new File(_tempDir, "maps.avro");
-
- @Override
- protected List<Map<String, Object>> getInputRecords() {
- List<Map<String, Object>> inputRecords = new ArrayList<>(2);
- Map<String, Object> record1 = new HashMap<>();
- Map<Integer, String> map1 = new HashMap<>();
- map1.put(30, "foo");
- map1.put(200, "bar");
- Map<String, Integer> map2 = new HashMap<>();
- map2.put("k1", 10000);
- map2.put("k2", 20000);
- record1.put("map1", map1);
- record1.put("map2", map2);
- inputRecords.add(record1);
-
- Map<String, Object> record2 = new HashMap<>();
- map1 = new HashMap<>();
- map1.put(30, "moo");
- map1.put(200, "baz");
- map2 = new HashMap<>();
- map2.put("k1", 100);
- map2.put("k2", 200);
- record2.put("map1", map1);
- record2.put("map2", map2);
- inputRecords.add(record2);
-
- return inputRecords;
- }
-
- @Override
- protected Set<String> getSourceFields() {
- return Sets.newHashSet("map1", "map2");
- }
-
- /**
- * Create an AvroRecordReader
- */
- @Override
- protected RecordReader createRecordReader(Set<String> fieldsToRead)
- throws IOException {
- AvroRecordReader avroRecordReader = new AvroRecordReader();
- avroRecordReader.init(_dataFile, fieldsToRead, null);
- return avroRecordReader;
- }
-
- /**
- * Create an Avro input file using the input records containing maps
- */
- @Override
- protected void createInputFile()
- throws IOException {
- org.apache.avro.Schema avroSchema = createRecord("mapRecord", null, null,
false);
- org.apache.avro.Schema intStringMapAvroSchema =
createMap(create(Type.STRING));
- org.apache.avro.Schema stringIntMapAvroSchema =
createMap(create(Type.INT));
- List<Field> fields = Arrays.asList(new Field("map1",
intStringMapAvroSchema, null, null),
- new Field("map2", stringIntMapAvroSchema, null, null));
- avroSchema.setFields(fields);
-
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, _dataFile);
- for (Map<String, Object> inputRecord : _inputRecords) {
- GenericData.Record record = new GenericData.Record(avroSchema);
- for (String columnName : _sourceFieldNames) {
- record.put(columnName, inputRecord.get(columnName));
- }
- fileWriter.append(record);
- }
- }
- }
-}
diff --git
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
index a806a8d..678265d 100644
---
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
@@ -18,8 +18,6 @@
*/
package org.apache.pinot.plugin.inputformat.json;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
@@ -47,66 +45,18 @@ public class JSONRecordExtractor implements
RecordExtractor<Map<String, Object>>
@Override
public GenericRow extract(Map<String, Object> from, GenericRow to) {
if (_extractAll) {
- from.forEach((fieldName, value) -> to.putValue(fieldName,
convertValue(value)));
+ from.forEach((fieldName, value) -> to.putValue(fieldName,
JSONRecordExtractorUtils.convertValue(value)));
} else {
for (String fieldName : _fields) {
Object value = from.get(fieldName);
// NOTE about JSON behavior - cannot distinguish between INT/LONG and
FLOAT/DOUBLE.
// DataTypeTransformer fixes it.
- Object convertedValue = convertValue(value);
+ Object convertedValue = JSONRecordExtractorUtils.convertValue(value);
to.putValue(fieldName, convertedValue);
}
}
return to;
}
- private Object convertValue(Object value) {
- Object convertedValue;
- if (value instanceof Collection) {
- convertedValue = convertMultiValue((Collection) value);
- } else {
- convertedValue = convertSingleValue(value);
- }
- return convertedValue;
- }
-
- /**
- * Converts the value to a single-valued value
- */
- @Nullable
- private Object convertSingleValue(@Nullable Object value) {
- if (value == null) {
- return null;
- }
- if (value instanceof Number) {
- return value;
- }
- return value.toString();
- }
- /**
- * Converts the value to a multi-valued value
- */
- @Nullable
- private Object convertMultiValue(@Nullable Collection values) {
- if (values == null || values.isEmpty()) {
- return null;
- }
- int numValues = values.size();
- Object[] array = new Object[numValues];
- int index = 0;
- for (Object value : values) {
- Object convertedValue = convertSingleValue(value);
- if (convertedValue != null && !convertedValue.toString().equals("")) {
- array[index++] = convertedValue;
- }
- }
- if (index == numValues) {
- return array;
- } else if (index == 0) {
- return null;
- } else {
- return Arrays.copyOf(array, index);
- }
- }
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtils.java
similarity index 51%
copy from
pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
copy to
pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtils.java
index a806a8d..47e7927 100644
---
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtils.java
@@ -20,83 +20,56 @@ package org.apache.pinot.plugin.inputformat.json;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
import javax.annotation.Nullable;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordExtractor;
-import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
/**
- * Extractor for JSON records
+ * Helper methods for converting values from json nodes to
*/
-public class JSONRecordExtractor implements RecordExtractor<Map<String,
Object>> {
+public final class JSONRecordExtractorUtils {
- private Set<String> _fields;
- private boolean _extractAll = false;
+ private JSONRecordExtractorUtils() {}
- @Override
- public void init(Set<String> fields, @Nullable RecordExtractorConfig
recordExtractorConfig) {
- _fields = fields;
- if (fields == null || fields.isEmpty()) {
- _extractAll = true;
- }
- }
-
- @Override
- public GenericRow extract(Map<String, Object> from, GenericRow to) {
- if (_extractAll) {
- from.forEach((fieldName, value) -> to.putValue(fieldName,
convertValue(value)));
- } else {
- for (String fieldName : _fields) {
- Object value = from.get(fieldName);
- // NOTE about JSON behavior - cannot distinguish between INT/LONG and
FLOAT/DOUBLE.
- // DataTypeTransformer fixes it.
- Object convertedValue = convertValue(value);
- to.putValue(fieldName, convertedValue);
- }
+ /**
+ * Converts the value to either a single value (string, number), a multi
value (Object[]) or a Map
+ *
+ * Natively Pinot only understands single values and multi values.
+ * Map is useful only if some ingestion transform functions operates on it
in the transformation layer
+ */
+ public static Object convertValue(Object value) {
+ if (value == null) {
+ return null;
}
- return to;
- }
-
- private Object convertValue(Object value) {
Object convertedValue;
if (value instanceof Collection) {
convertedValue = convertMultiValue((Collection) value);
+ } else if (value instanceof Map) {
+ convertedValue = convertMap((Map) value);
} else {
- convertedValue = convertSingleValue(value);
+ if (value instanceof Number) {
+ convertedValue = value;
+ } else {
+ convertedValue = value.toString();
+ }
}
return convertedValue;
}
/**
- * Converts the value to a single-valued value
+ * Applies conversion to each element of the collection
*/
@Nullable
- private Object convertSingleValue(@Nullable Object value) {
- if (value == null) {
- return null;
- }
- if (value instanceof Number) {
- return value;
- }
- return value.toString();
- }
-
- /**
- * Converts the value to a multi-valued value
- */
- @Nullable
- private Object convertMultiValue(@Nullable Collection values) {
- if (values == null || values.isEmpty()) {
+ private static Object convertMultiValue(Collection values) {
+ if (values.isEmpty()) {
return null;
}
int numValues = values.size();
Object[] array = new Object[numValues];
int index = 0;
for (Object value : values) {
- Object convertedValue = convertSingleValue(value);
+ Object convertedValue = convertValue(value);
if (convertedValue != null && !convertedValue.toString().equals("")) {
array[index++] = convertedValue;
}
@@ -109,4 +82,19 @@ public class JSONRecordExtractor implements
RecordExtractor<Map<String, Object>>
return Arrays.copyOf(array, index);
}
}
+
+ /**
+ * Applies the conversion to each value of the map
+ */
+ @Nullable
+ private static Object convertMap(Map map) {
+ if (map.isEmpty()) {
+ return null;
+ }
+ Map<Object, Object> convertedMap = new HashMap<>();
+ for (Object key : map.keySet()) {
+ convertedMap.put(key, convertValue(map.get(key)));
+ }
+ return convertedMap;
+ }
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtilsTest.java
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtilsTest.java
new file mode 100644
index 0000000..8821aed
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtilsTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.json;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+
+public class JSONRecordExtractorUtilsTest {
+
+ @Test(dataProvider = "conversionTestData")
+ public void testConversion(Object value, Object expectedConvertedValue)
+ throws JsonProcessingException {
+ Object convertedValue = JSONRecordExtractorUtils.convertValue(value);
+ Assert.assertEquals(JsonUtils.objectToString(convertedValue),
JsonUtils.objectToString(expectedConvertedValue));
+ }
+
+ @DataProvider(name = "conversionTestData")
+ public Object[][] getConversionTestData()
+ throws IOException {
+ List<Object[]> input = new ArrayList<>();
+
+ String jsonString = "{\n"
+ + " \"myInt\": 10,\n"
+ + " \"myLong\": 1588469340000,\n"
+ + " \"myDouble\": 10.2,\n"
+ + " \"myNull\": null,\n"
+ + " \"myString\": \"foo\",\n"
+ + " \"myIntArray\": [10, 20, 30],\n"
+ + " \"myStringArray\": [\"foo\", null, \"bar\"],\n"
+ + " \"myDoubleArray\": [10.2, 12.1, 1.1],\n"
+ + " \"myComplexArray1\": [{\"one\": 1, \"two\": \"too\"}, {\"one\":
11, \"two\": \"roo\"}],\n"
+ + " \"myComplexArray2\": [{\"one\":1, \"two\": {\"sub1\":1.1,
\"sub2\": 1.2}, \"three\":[\"a\", \"b\"]}, {\"one\":11, \"two\":
{\"sub1\":11.1, \"sub2\": 11.2}, \"three\":[\"aa\", \"bb\"]}],\n"
+ + " \"myMap1\": {\"k1\": \"foo\", \"k2\": \"bar\"},\n"
+ + " \"myMap2\": {\"k3\": {\"sub1\": 10, \"sub2\": 1.0}, \"k4\":
\"baz\", \"k5\": [1,2,3]}\n" + "}";
+ Map<String, Object> jsonNode = new ObjectMapper().readValue(jsonString,
Map.class);
+ input.add(new Object[]{jsonNode.get("myNull"), null});
+
+ input.add(new Object[]{jsonNode.get("myInt"), 10});
+
+ input.add(new Object[]{jsonNode.get("myLong"), 1588469340000L});
+
+ input.add(new Object[]{jsonNode.get("myDouble"), 10.2});
+
+ input.add(new Object[]{jsonNode.get("myString"), "foo"});
+
+ input.add(new Object[]{jsonNode.get("myIntArray"), new Object[]{10, 20,
30}});
+
+ input.add(new Object[]{jsonNode.get("myDoubleArray"), new Object[]{10.2,
12.1, 1.1}});
+
+ input.add(new Object[]{jsonNode.get("myStringArray"), new Object[]{"foo",
"bar"}});
+
+ Map<String, Object> map1 = new HashMap<>();
+ map1.put("one", 1);
+ map1.put("two", "too");
+ Map<String, Object> map2 = new HashMap<>();
+ map2.put("one", 11);
+ map2.put("two", "roo");
+ input.add(new Object[]{jsonNode.get("myComplexArray1"), new Object[]{map1,
map2}});
+
+ Map<String, Object> map3 = new HashMap<>();
+ map3.put("one", 1);
+ Map<String, Object> map31 = new HashMap<>();
+ map31.put("sub1", 1.1);
+ map31.put("sub2", 1.2);
+ map3.put("two", map31);
+ map3.put("three", new Object[]{"a", "b"});
+ Map<String, Object> map4 = new HashMap<>();
+ map4.put("one", 11);
+ Map<String, Object> map41 = new HashMap<>();
+ map41.put("sub1", 11.1);
+ map41.put("sub2", 11.2);
+ map4.put("two", map41);
+ map4.put("three", new Object[]{"aa", "bb"});
+ input.add(new Object[]{jsonNode.get("myComplexArray2"), new Object[]{map3,
map4}});
+
+ Map<String, Object> map5 = new HashMap<>();
+ map5.put("k1", "foo");
+ map5.put("k2", "bar");
+ input.add(new Object[]{jsonNode.get("myMap1"), map5});
+
+ Map<String, Object> map6 = new HashMap<>();
+ Map<String, Object> map61 = new HashMap<>();
+ map61.put("sub1", 10);
+ map61.put("sub2", 1.0);
+ map6.put("k3", map61);
+ map6.put("k4", "baz");
+ map6.put("k5", new Object[]{1, 2, 3});
+ input.add(new Object[]{jsonNode.get("myMap2"), map6});
+
+ return input.toArray(new Object[0][]);
+ }
+
+}
\ No newline at end of file
diff --git a/pinot-server/src/test/resources/data/test_data-mv.avro
b/pinot-server/src/test/resources/data/test_data-mv.avro
index 151e091..aff4520 100644
Binary files a/pinot-server/src/test/resources/data/test_data-mv.avro and
b/pinot-server/src/test/resources/data/test_data-mv.avro differ
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
index 36c919f..f1fc762 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
@@ -23,6 +23,9 @@ import java.util.Set;
/**
* Extracts fields from input records
+ * 1) Number/String/ByteBuffer become single-value column
+ * 2) Collections become Object[] i.e. multi-value column
+ * 3) Nested/Complex fields (e.g. json maps, avro maps, avro records) become
Map<Object, Object>
* @param <T> The format of the input record
*/
public interface RecordExtractor<T> {
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
index e3a1f4a..052209a 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.avro.generic.GenericData;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.data.Schema;
import org.testng.Assert;
@@ -114,25 +115,35 @@ public abstract class AbstractRecordExtractorTest {
String columnName = entry.getKey();
Object expectedValue = entry.getValue();
Object actualValue = genericRow.getValue(columnName);
- if (expectedValue instanceof Collection) {
- List actualArray =
- actualValue instanceof List ? (ArrayList) actualValue :
Arrays.asList((Object[]) actualValue);
- List expectedArray = (List) expectedValue;
- for (int j = 0; j < actualArray.size(); j++) {
- Assert.assertEquals(actualArray.get(j), expectedArray.get(j));
- }
- } else if (expectedValue instanceof Map) {
- Map<Object, Object> actualMap = (HashMap) actualValue;
- Map<Object, Object> expectedMap = (HashMap) expectedValue;
- for (Map.Entry<Object, Object> mapEntry : expectedMap.entrySet()) {
- Assert.assertEquals(actualMap.get(mapEntry.getKey().toString()),
mapEntry.getValue());
- }
+ checkValue(expectedValue, actualValue);
+ }
+ }
+
+ private void checkValue(Object expectedValue, Object actualValue) {
+ if (expectedValue instanceof Collection) {
+ List actualArray =
+ actualValue instanceof List ? (ArrayList) actualValue :
Arrays.asList((Object[]) actualValue);
+ List expectedArray = (List) expectedValue;
+ for (int j = 0; j < actualArray.size(); j++) {
+ checkValue(expectedArray.get(j), actualArray.get(j));
+ }
+ } else if (expectedValue instanceof Map) {
+ Map<Object, Object> actualMap = (HashMap) actualValue;
+ Map<Object, Object> expectedMap = (HashMap) expectedValue;
+ for (Map.Entry<Object, Object> mapEntry : expectedMap.entrySet()) {
+ Assert.assertEquals(actualMap.get(mapEntry.getKey().toString()),
mapEntry.getValue());
+ }
+ } else if (expectedValue instanceof GenericData.Record) {
+ Map<Object, Object> actualMap = (HashMap) actualValue;
+ GenericData.Record expectedGenericRecord = (GenericData.Record)
expectedValue;
+ for (Map.Entry<Object, Object> mapEntry : actualMap.entrySet()) {
+ checkValue(expectedGenericRecord.get(mapEntry.getKey().toString()),
mapEntry.getValue());
+ }
+ } else {
+ if (expectedValue != null) {
+ Assert.assertEquals(actualValue, expectedValue);
} else {
- if (expectedValue != null) {
- Assert.assertEquals(actualValue, expectedValue);
- } else {
- Assert.assertNull(actualValue);
- }
+ Assert.assertNull(actualValue);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]