Jackie-Jiang commented on a change in pull request #6928:
URL: https://github.com/apache/incubator-pinot/pull/6928#discussion_r633250076
##########
File path:
pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
##########
@@ -277,4 +280,86 @@ public static DataType extractFieldDataType(Field field) {
return fieldSchema;
}
}
+
+ private static void
extractSchemaWithComplexTypeHandling(org.apache.avro.Schema fieldSchema,
+ List<String> unnestFields, String delimiter, String path, Schema
pinotSchema,
+ @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable
TimeUnit timeUnit) {
+ org.apache.avro.Schema.Type fieldType = fieldSchema.getType();
+ if (fieldType == org.apache.avro.Schema.Type.UNION) {
+ org.apache.avro.Schema nonNullSchema = null;
+ for (org.apache.avro.Schema childFieldSchema : fieldSchema.getTypes()) {
+ if (childFieldSchema.getType() != org.apache.avro.Schema.Type.NULL) {
+ if (nonNullSchema == null) {
+ nonNullSchema = childFieldSchema;
+ } else {
+ throw new IllegalStateException("More than one non-null schema in
UNION schema");
+ }
+ }
+ }
+ if (nonNullSchema != null) {
+ extractSchemaWithComplexTypeHandling(nonNullSchema, unnestFields,
delimiter, path, pinotSchema, fieldTypeMap,
+ timeUnit);
+ } else {
+ throw new IllegalStateException("Cannot find non-null schema in UNION
schema");
+ }
+ } else if (fieldType == org.apache.avro.Schema.Type.RECORD) {
+ for (Field innerField : fieldSchema.getFields()) {
+ extractSchemaWithComplexTypeHandling(innerField.schema(),
unnestFields, delimiter,
+ concat(delimiter, path, innerField.name()), pinotSchema,
fieldTypeMap, timeUnit);
+ }
+ } else if (fieldType == org.apache.avro.Schema.Type.ARRAY) {
+ org.apache.avro.Schema elementType = fieldSchema.getElementType();
+ if (unnestFields.contains(path)) {
+ extractSchemaWithComplexTypeHandling(elementType, unnestFields,
delimiter, path, pinotSchema, fieldTypeMap,
+ timeUnit);
+ } else if (AvroSchemaUtil.isPrimitiveType(elementType.getType())) {
+ addFieldToPinotSchema(pinotSchema,
AvroSchemaUtil.valueOf(elementType.getType()), path, false, fieldTypeMap,
+ timeUnit);
+ } else {
+ addFieldToPinotSchema(pinotSchema, DataType.STRING, path, true,
fieldTypeMap, timeUnit);
+ }
+ } else {
+ DataType dataType = AvroSchemaUtil.valueOf(fieldType);
+ addFieldToPinotSchema(pinotSchema, dataType, path, true, fieldTypeMap,
timeUnit);
+ }
+ }
+
+ private static void addFieldToPinotSchema(Schema pinotSchema, DataType
dataType, String name,
+ boolean isSingleValueField, @Nullable Map<String, FieldSpec.FieldType>
fieldTypeMap,
+ @Nullable TimeUnit timeUnit) {
+ if (fieldTypeMap == null) {
+ pinotSchema.addField(new DimensionFieldSpec(name, dataType,
isSingleValueField));
+ } else {
+ FieldSpec.FieldType fieldType =
+ fieldTypeMap.containsKey(name) ? fieldTypeMap.get(name) :
FieldSpec.FieldType.DIMENSION;
+ Preconditions.checkNotNull(fieldType, "Field type not specified for
field: %s", name);
+ switch (fieldType) {
+ case DIMENSION:
+ pinotSchema.addField(new DimensionFieldSpec(name, dataType,
isSingleValueField));
+ break;
+ case METRIC:
+ Preconditions.checkState(isSingleValueField, "Metric field: %s
cannot be multi-valued", name);
+ pinotSchema.addField(new MetricFieldSpec(name, dataType));
+ break;
+ case TIME:
+ Preconditions.checkState(isSingleValueField, "Time field: %s cannot
be multi-valued", name);
+ Preconditions.checkNotNull(timeUnit, "Time unit cannot be null");
+ pinotSchema.addField(new TimeFieldSpec(new
TimeGranularitySpec(dataType, timeUnit, name)));
+ break;
+ case DATE_TIME:
+ Preconditions.checkState(isSingleValueField, "Time field: %s cannot
be multi-valued", name);
+ Preconditions.checkNotNull(timeUnit, "Time unit cannot be null");
+ pinotSchema.addField(new DateTimeFieldSpec(name, dataType,
+ new DateTimeFormatSpec(1, timeUnit.toString(),
DateTimeFieldSpec.TimeFormat.EPOCH.toString()).getFormat(),
+ new DateTimeGranularitySpec(1, timeUnit).getGranularity()));
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported field type: " +
fieldType + " for field: " + name);
+ }
+ }
+ }
+
+ private static String concat(String delimiter, String path, String
component) {
Review comment:
Directly inline this method might be more readable?
##########
File path:
pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
##########
@@ -277,4 +280,86 @@ public static DataType extractFieldDataType(Field field) {
return fieldSchema;
}
}
+
+ private static void
extractSchemaWithComplexTypeHandling(org.apache.avro.Schema fieldSchema,
+ List<String> unnestFields, String delimiter, String path, Schema
pinotSchema,
+ @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable
TimeUnit timeUnit) {
+ org.apache.avro.Schema.Type fieldType = fieldSchema.getType();
+ if (fieldType == org.apache.avro.Schema.Type.UNION) {
Review comment:
Consider using switch?
##########
File path:
pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroSchemaUtil.java
##########
@@ -52,6 +52,25 @@ public static DataType valueOf(Schema.Type avroType) {
}
}
+ /**
+ * @return if the given avro type is a primitive type.
+ */
+ public static boolean isPrimitiveType(Schema.Type avroType) {
+ switch (avroType) {
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ case STRING:
+ case ENUM:
+ case BYTES:
Review comment:
Currently Pinot does not support MV BYTES, so not sure if we want to
count it as primitive type
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]