This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c177e2be6b35 feat(schema): Migrate json and proto converters to use
HoodieSchema (#17740)
c177e2be6b35 is described below
commit c177e2be6b3509f61a2b9cf90c05ae77ddaeaa6b
Author: Tim Brown <[email protected]>
AuthorDate: Fri Jan 2 00:38:59 2026 -0500
feat(schema): Migrate json and proto converters to use HoodieSchema (#17740)
* migrate json and proto converters to use HoodieSchema
* fix type handling
* fix uuid case
* fix decimal to row converter
* fix proto converter issues
* address nits
---
LICENSE | 10 -
.../java/org/apache/hudi/avro/JsonEncoder.java | 348 ---------------------
.../apache/hudi/avro/MercifulJsonConverter.java | 144 ++++-----
.../processors/DecimalLogicalTypeProcessor.java | 27 +-
.../processors/DurationLogicalTypeProcessor.java | 11 +-
.../hudi/avro/processors/EnumTypeProcessor.java | 5 +-
.../hudi/avro/processors/FixedTypeProcessor.java | 4 +-
.../hudi/avro/processors/JsonFieldProcessor.java | 7 +-
.../LocalTimestampMicroLogicalTypeProcessor.java | 5 +-
.../LocalTimestampMilliLogicalTypeProcessor.java | 5 +-
.../avro/processors/TimeLogicalTypeProcessor.java | 8 +-
.../processors/TimeMicroLogicalTypeProcessor.java | 5 +-
.../processors/TimeMilliLogicalTypeProcessor.java | 5 +-
.../TimestampMicroLogicalTypeProcessor.java | 5 +-
.../TimestampMilliLogicalTypeProcessor.java | 5 +-
.../org/apache/hudi/common/HoodieJsonPayload.java | 3 +-
.../hudi/common/util/LocalAvroSchemaCache.java | 67 ----
.../schema/convert/InternalSchemaConverter.java | 26 +-
.../hudi/avro/TestMercifulJsonConverter.java | 48 +--
.../hudi/common/testutils/RawTripTestPayload.java | 7 +-
.../hudi/common/testutils/SchemaTestUtil.java | 2 +-
.../hudi/common/util/TestLocalAvroSchemaCache.java | 66 ----
.../schema/utils/TestAvroSchemaEvolutionUtils.java | 8 +-
.../utilities/sources/helpers/AvroConvertor.java | 2 +-
.../helpers/MercifulJsonToRowConverter.java | 57 ++--
.../sources/helpers/ProtoConversionUtil.java | 106 +++----
26 files changed, 227 insertions(+), 759 deletions(-)
diff --git a/LICENSE b/LICENSE
index e8ef41942d28..301ea869628b 100644
--- a/LICENSE
+++ b/LICENSE
@@ -358,13 +358,3 @@ Copyright (c) 2005, European Commission project OneLab
under contract 034819 (ht
Home page: https://github.com/streamsets/datacollector-oss
License: http://www.apache.org/licenses/LICENSE-2.0
-
-
-------------------------------------------------------------------------------
-
- This product includes code from Apache Avro
-
- * org.apache.hudi.avro.JsonEncoder adapted from org.apache.avro.io.JsonEncoder
-
- Copyright: 2010-2019 The Apache Software Foundation
- Home page: https://avro.apache.org
- License: http://www.apache.org/licenses/LICENSE-2.0
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/JsonEncoder.java
b/hudi-common/src/main/java/org/apache/hudi/avro/JsonEncoder.java
deleted file mode 100644
index 37f67f8df80f..000000000000
--- a/hudi-common/src/main/java/org/apache/hudi/avro/JsonEncoder.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.avro;
-
-import com.fasterxml.jackson.core.JsonEncoding;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.avro.AvroTypeException;
-import org.apache.avro.Schema;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.ParsingEncoder;
-import org.apache.avro.io.parsing.JsonGrammarGenerator;
-import org.apache.avro.io.parsing.Parser;
-import org.apache.avro.io.parsing.Symbol;
-import org.apache.avro.util.Utf8;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.BitSet;
-import java.util.EnumSet;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * An {@link Encoder} for Avro's JSON data encoding.
- *
- * <p>NOTE: This class is a copy of Avro's JsonEncoder class, with the only
difference being that
- * this class does not wrap union types in JSON by overriding {@link
#writeIndex(int)}.
- *
- * <p>By default, Avro's JSON encoding for union types is to wrap the union
value
- * in a JSON object with the type name as the key (e.g., {"string": "value"}).
- * This encoder overrides that behavior to write just the value, resulting in
cleaner JSON output.
- *
- * <p>For instance, a union with schema ["null", "string"] would be encoded as
just
- * "value" instead of {"string": "value"}.
- *
- * <p>This encoder is particularly useful when the standard Avro JSON format's
verbosity
- * for union types is not desired.
- */
-public class JsonEncoder extends ParsingEncoder implements
Parser.ActionHandler {
- private static final String LINE_SEPARATOR =
System.getProperty("line.separator");
- final Parser parser;
- private JsonGenerator out;
- @Getter
- @Setter
- private boolean includeNamespace = true;
-
- /**
- * Has anything been written into the collections?
- */
- protected BitSet isEmpty = new BitSet();
-
- public JsonEncoder(Schema sc, OutputStream out) throws IOException {
- this(sc, getJsonGenerator(out, EnumSet.noneOf(JsonOptions.class)));
- }
-
- JsonEncoder(Schema sc, OutputStream out, boolean pretty) throws IOException {
- this(sc, getJsonGenerator(out, pretty ? EnumSet.of(JsonOptions.Pretty) :
EnumSet.noneOf(JsonOptions.class)));
- }
-
- JsonEncoder(Schema sc, OutputStream out, Set<JsonOptions> options) throws
IOException {
- this(sc, getJsonGenerator(out, options));
- }
-
- JsonEncoder(Schema sc, JsonGenerator out) throws IOException {
- configure(out);
- this.parser = new Parser(new JsonGrammarGenerator().generate(sc), this);
- }
-
- @Override
- public void flush() throws IOException {
- parser.processImplicitActions();
- if (out != null) {
- out.flush();
- }
- }
-
- enum JsonOptions {
- Pretty,
-
- // Prevent underlying outputstream to be flush for optimisation purpose.
- NoFlushStream
- }
-
- // by default, one object per line.
- // with pretty option use default pretty printer with root line separator.
- private static JsonGenerator getJsonGenerator(OutputStream out,
Set<JsonOptions> options) throws IOException {
- Objects.requireNonNull(out, "OutputStream cannot be null");
- JsonGenerator g = new JsonFactory().createJsonGenerator(out,
JsonEncoding.UTF8);
- if (options.contains(JsonOptions.NoFlushStream)) {
- g = g.configure(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false);
- }
- MinimalPrettyPrinter pp = new MinimalPrettyPrinter();
- pp.setRootValueSeparator(LINE_SEPARATOR);
- g.setPrettyPrinter(pp);
- return g;
- }
-
- /**
- * Reconfigures this JsonEncoder to use the output stream provided.
- * <p/>
- * If the OutputStream provided is null, a NullPointerException is thrown.
- * <p/>
- * Otherwise, this JsonEncoder will flush its current output and then
- * reconfigure its output to use a default UTF8 JsonGenerator that writes to
the
- * provided OutputStream.
- *
- * @param out The OutputStream to direct output to. Cannot be null.
- * @return this JsonEncoder
- * @throws IOException
- * @throws NullPointerException if {@code out} is {@code null}
- */
- public JsonEncoder configure(OutputStream out) throws IOException {
- return this.configure(out, true);
- }
-
- /**
- * Reconfigures this JsonEncoder to use the output stream provided.
- * <p/>
- * If the OutputStream provided is null, a NullPointerException is thrown.
- * <p/>
- * Otherwise, this JsonEncoder will flush its current output and then
- * reconfigure its output to use a default UTF8 JsonGenerator that writes to
the
- * provided OutputStream.
- *
- * @param out The OutputStream to direct output to. Cannot be null.
- * @return this JsonEncoder
- * @throws IOException
- * @throws NullPointerException if {@code out} is {@code null}
- */
- public JsonEncoder configure(OutputStream out, boolean autoflush) throws
IOException {
- EnumSet<JsonOptions> jsonOptions = EnumSet.noneOf(JsonOptions.class);
- if (!autoflush) {
- jsonOptions.add(JsonOptions.NoFlushStream);
- }
- this.configure(getJsonGenerator(out, jsonOptions));
- return this;
- }
-
- /**
- * Reconfigures this JsonEncoder to output to the JsonGenerator provided.
- * <p/>
- * If the JsonGenerator provided is null, a NullPointerException is thrown.
- * <p/>
- * Otherwise, this JsonEncoder will flush its current output and then
- * reconfigure its output to use the provided JsonGenerator.
- *
- * @param generator The JsonGenerator to direct output to. Cannot be null.
- * @return this JsonEncoder
- * @throws IOException
- * @throws NullPointerException if {@code generator} is {@code null}
- */
- private JsonEncoder configure(JsonGenerator generator) throws IOException {
- Objects.requireNonNull(generator, "JsonGenerator cannot be null");
- if (null != parser) {
- flush();
- }
- this.out = generator;
- return this;
- }
-
- @Override
- public void writeNull() throws IOException {
- parser.advance(Symbol.NULL);
- out.writeNull();
- }
-
- @Override
- public void writeBoolean(boolean b) throws IOException {
- parser.advance(Symbol.BOOLEAN);
- out.writeBoolean(b);
- }
-
- @Override
- public void writeInt(int n) throws IOException {
- parser.advance(Symbol.INT);
- out.writeNumber(n);
- }
-
- @Override
- public void writeLong(long n) throws IOException {
- parser.advance(Symbol.LONG);
- out.writeNumber(n);
- }
-
- @Override
- public void writeFloat(float f) throws IOException {
- parser.advance(Symbol.FLOAT);
- out.writeNumber(f);
- }
-
- @Override
- public void writeDouble(double d) throws IOException {
- parser.advance(Symbol.DOUBLE);
- out.writeNumber(d);
- }
-
- @Override
- public void writeString(Utf8 utf8) throws IOException {
- writeString(utf8.toString());
- }
-
- @Override
- public void writeString(String str) throws IOException {
- parser.advance(Symbol.STRING);
- if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) {
- parser.advance(Symbol.MAP_KEY_MARKER);
- out.writeFieldName(str);
- } else {
- out.writeString(str);
- }
- }
-
- @Override
- public void writeBytes(ByteBuffer bytes) throws IOException {
- if (bytes.hasArray()) {
- writeBytes(bytes.array(), bytes.position(), bytes.remaining());
- } else {
- byte[] b = new byte[bytes.remaining()];
- bytes.duplicate().get(b);
- writeBytes(b);
- }
- }
-
- @Override
- public void writeBytes(byte[] bytes, int start, int len) throws IOException {
- parser.advance(Symbol.BYTES);
- writeByteArray(bytes, start, len);
- }
-
- private void writeByteArray(byte[] bytes, int start, int len) throws
IOException {
- out.writeString(new String(bytes, start, len,
StandardCharsets.ISO_8859_1));
- }
-
- @Override
- public void writeFixed(byte[] bytes, int start, int len) throws IOException {
- parser.advance(Symbol.FIXED);
- Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol();
- if (len != top.size) {
- throw new AvroTypeException(
- "Incorrect length for fixed binary: expected " + top.size + " but
received " + len + " bytes.");
- }
- writeByteArray(bytes, start, len);
- }
-
- @Override
- public void writeEnum(int e) throws IOException {
- parser.advance(Symbol.ENUM);
- Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol();
- if (e < 0 || e >= top.size) {
- throw new AvroTypeException("Enumeration out of range: max is " +
top.size + " but received " + e);
- }
- out.writeString(top.getLabel(e));
- }
-
- @Override
- public void writeArrayStart() throws IOException {
- parser.advance(Symbol.ARRAY_START);
- out.writeStartArray();
- push();
- isEmpty.set(depth());
- }
-
- @Override
- public void writeArrayEnd() throws IOException {
- if (!isEmpty.get(pos)) {
- parser.advance(Symbol.ITEM_END);
- }
- pop();
- parser.advance(Symbol.ARRAY_END);
- out.writeEndArray();
- }
-
- @Override
- public void writeMapStart() throws IOException {
- push();
- isEmpty.set(depth());
-
- parser.advance(Symbol.MAP_START);
- out.writeStartObject();
- }
-
- @Override
- public void writeMapEnd() throws IOException {
- if (!isEmpty.get(pos)) {
- parser.advance(Symbol.ITEM_END);
- }
- pop();
-
- parser.advance(Symbol.MAP_END);
- out.writeEndObject();
- }
-
- @Override
- public void startItem() throws IOException {
- if (!isEmpty.get(pos)) {
- parser.advance(Symbol.ITEM_END);
- }
- super.startItem();
- isEmpty.clear(depth());
- }
-
- @Override
- public void writeIndex(int unionIndex) throws IOException {
- // Do not write index for union types.
- parser.advance(Symbol.UNION);
- Symbol.Alternative top = (Symbol.Alternative) parser.popSymbol();
- Symbol symbol = top.getSymbol(unionIndex);
- parser.pushSymbol(symbol);
- }
-
- @Override
- public Symbol doAction(Symbol input, Symbol top) throws IOException {
- if (top instanceof Symbol.FieldAdjustAction) {
- Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top;
- out.writeFieldName(fa.fname);
- } else if (top == Symbol.RECORD_START) {
- out.writeStartObject();
- } else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) {
- out.writeEndObject();
- } else if (top != Symbol.FIELD_END) {
- throw new AvroTypeException("Unknown action symbol " + top);
- }
- return null;
- }
-}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
index f12a07c8a84e..3a7e6d7cae1f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
@@ -31,6 +31,9 @@ import
org.apache.hudi.avro.processors.TimeMicroLogicalTypeProcessor;
import org.apache.hudi.avro.processors.TimeMilliLogicalTypeProcessor;
import org.apache.hudi.avro.processors.TimestampMicroLogicalTypeProcessor;
import org.apache.hudi.avro.processors.TimestampMilliLogicalTypeProcessor;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -42,8 +45,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
@@ -70,7 +71,7 @@ public class MercifulJsonConverter {
// For each schema (keyed by full name), stores a mapping of schema field
name to json field name to account for sanitization of fields
private static final Map<String, Map<String, String>>
SANITIZED_FIELD_MAPPINGS = new ConcurrentHashMap<>();
- private final Map<Schema.Type, JsonFieldProcessor> fieldTypeProcessorMap;
+ private final Map<HoodieSchemaType, JsonFieldProcessor>
fieldTypeProcessorMap;
private final Map<String, JsonFieldProcessor> fieldLogicalTypeProcessorMap;
protected final ObjectMapper mapper;
@@ -111,7 +112,7 @@ public class MercifulJsonConverter {
* @param json Json record
* @param schema Schema
*/
- public GenericRecord convert(String json, Schema schema) {
+ public GenericRecord convert(String json, HoodieSchema schema) {
try {
Map<String, Object> jsonObjectMap = mapper.readValue(json, Map.class);
return convertJsonToAvro(jsonObjectMap, schema);
@@ -128,9 +129,9 @@ public class MercifulJsonConverter {
SANITIZED_FIELD_MAPPINGS.remove(schemaFullName);
}
- private GenericRecord convertJsonToAvro(Map<String, Object> inputJson,
Schema schema) {
- GenericRecord avroRecord = new GenericData.Record(schema);
- for (Schema.Field f : schema.getFields()) {
+ private GenericRecord convertJsonToAvro(Map<String, Object> inputJson,
HoodieSchema schema) {
+ GenericRecord avroRecord = new GenericData.Record(schema.toAvroSchema());
+ for (HoodieSchemaField f : schema.getFields()) {
Object val = shouldSanitize ? getFieldFromJson(f, inputJson,
schema.getFullName(), invalidCharMask) : inputJson.get(f.name());
if (val != null) {
avroRecord.put(f.pos(), convertJsonField(val, f.name(), f.schema()));
@@ -139,7 +140,7 @@ public class MercifulJsonConverter {
return avroRecord;
}
- protected static Object getFieldFromJson(final Schema.Field fieldSchema,
final Map<String, Object> inputJson, final String schemaFullName, final String
invalidCharMask) {
+ protected static Object getFieldFromJson(final HoodieSchemaField
fieldSchema, final Map<String, Object> inputJson, final String schemaFullName,
final String invalidCharMask) {
Map<String, String> schemaToJsonFieldNames =
SANITIZED_FIELD_MAPPINGS.computeIfAbsent(schemaFullName, unused -> new
ConcurrentHashMap<>());
if (!schemaToJsonFieldNames.containsKey(fieldSchema.name())) {
// if we don't have field mapping, proactively populate as many as
possible based on input json
@@ -164,25 +165,13 @@ public class MercifulJsonConverter {
return null;
}
- private Schema getNonNull(Schema schema) {
- List<Schema> types = schema.getTypes();
- Schema.Type firstType = types.get(0).getType();
- return firstType.equals(Schema.Type.NULL) ? types.get(1) : types.get(0);
- }
-
- private boolean isOptional(Schema schema) {
- return schema.getType().equals(Schema.Type.UNION) &&
schema.getTypes().size() == 2
- && (schema.getTypes().get(0).getType().equals(Schema.Type.NULL)
- || schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
- }
-
- protected Object convertJsonField(Object value, String name, Schema schema) {
+ protected Object convertJsonField(Object value, String name, HoodieSchema
schema) {
- if (isOptional(schema)) {
+ if (schema.isNullable()) {
if (value == null) {
return null;
} else {
- schema = getNonNull(schema);
+ schema = schema.getNonNullType();
}
} else if (value == null) {
// Always fail on null for non-nullable schemas
@@ -193,18 +182,18 @@ public class MercifulJsonConverter {
return convertField(value, name, schema);
}
- private Object convertField(Object value, String name, Schema schema) {
+ private Object convertField(Object value, String name, HoodieSchema schema) {
JsonFieldProcessor processor = getProcessorForSchema(schema);
return processor.convertField(value, name, schema);
}
- protected JsonFieldProcessor getProcessorForSchema(Schema schema) {
+ protected JsonFieldProcessor getProcessorForSchema(HoodieSchema schema) {
JsonFieldProcessor processor = null;
// 3 cases to consider: customized logicalType, logicalType, and type.
- String customizedLogicalType = schema.getProp("logicalType");
- LogicalType logicalType = schema.getLogicalType();
- Type type = schema.getType();
+ String customizedLogicalType = (String) schema.getProp("logicalType");
+ LogicalType logicalType = schema.toAvroSchema().getLogicalType();
+ HoodieSchemaType type = schema.getType();
if (customizedLogicalType != null && !customizedLogicalType.isEmpty()) {
processor = fieldLogicalTypeProcessorMap.get(customizedLogicalType);
} else if (logicalType != null) {
@@ -221,20 +210,20 @@ public class MercifulJsonConverter {
/**
* Build type processor map for each avro type.
*/
- private Map<Schema.Type, JsonFieldProcessor> getFieldTypeProcessors() {
- Map<Schema.Type, JsonFieldProcessor> fieldTypeProcessors = new
EnumMap<>(Schema.Type.class);
- fieldTypeProcessors.put(Type.STRING, generateStringTypeHandler());
- fieldTypeProcessors.put(Type.BOOLEAN, generateBooleanTypeHandler());
- fieldTypeProcessors.put(Type.DOUBLE, generateDoubleTypeHandler());
- fieldTypeProcessors.put(Type.FLOAT, generateFloatTypeHandler());
- fieldTypeProcessors.put(Type.INT, generateIntTypeHandler());
- fieldTypeProcessors.put(Type.LONG, generateLongTypeHandler());
- fieldTypeProcessors.put(Type.ARRAY, generateArrayTypeHandler());
- fieldTypeProcessors.put(Type.RECORD, generateRecordTypeHandler());
- fieldTypeProcessors.put(Type.ENUM, generateEnumTypeHandler());
- fieldTypeProcessors.put(Type.MAP, generateMapTypeHandler());
- fieldTypeProcessors.put(Type.BYTES, generateBytesTypeHandler());
- fieldTypeProcessors.put(Type.FIXED, generateFixedTypeHandler());
+ private Map<HoodieSchemaType, JsonFieldProcessor> getFieldTypeProcessors() {
+ Map<HoodieSchemaType, JsonFieldProcessor> fieldTypeProcessors = new
EnumMap<>(HoodieSchemaType.class);
+ fieldTypeProcessors.put(HoodieSchemaType.STRING,
generateStringTypeHandler());
+ fieldTypeProcessors.put(HoodieSchemaType.BOOLEAN,
generateBooleanTypeHandler());
+ fieldTypeProcessors.put(HoodieSchemaType.DOUBLE,
generateDoubleTypeHandler());
+ fieldTypeProcessors.put(HoodieSchemaType.FLOAT,
generateFloatTypeHandler());
+ fieldTypeProcessors.put(HoodieSchemaType.INT, generateIntTypeHandler());
+ fieldTypeProcessors.put(HoodieSchemaType.LONG, generateLongTypeHandler());
+ fieldTypeProcessors.put(HoodieSchemaType.ARRAY,
generateArrayTypeHandler());
+ fieldTypeProcessors.put(HoodieSchemaType.RECORD,
generateRecordTypeHandler());
+ fieldTypeProcessors.put(HoodieSchemaType.ENUM, generateEnumTypeHandler());
+ fieldTypeProcessors.put(HoodieSchemaType.MAP, generateMapTypeHandler());
+ fieldTypeProcessors.put(HoodieSchemaType.BYTES,
generateBytesTypeHandler());
+ fieldTypeProcessors.put(HoodieSchemaType.FIXED,
generateFixedTypeHandler());
return Collections.unmodifiableMap(fieldTypeProcessors);
}
@@ -290,37 +279,34 @@ public class MercifulJsonConverter {
private class DecimalToAvroLogicalTypeProcessor extends
DecimalLogicalTypeProcessor {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
if (!isValidDecimalTypeConfig(schema)) {
return Pair.of(false, null);
}
// Case 1: Input is a list. It is expected to be raw Fixed byte array
input, and we only support
// parsing it to Fixed avro type.
- if (value instanceof List<?> && schema.getType() == Type.FIXED) {
+ HoodieSchema.Decimal decimalSchema = (HoodieSchema.Decimal) schema;
+ if (value instanceof List<?> && decimalSchema.isFixed()) {
JsonFieldProcessor processor = generateFixedTypeHandler();
return processor.convert(value, name, schema);
}
// Case 2: Input is a number or String number.
- LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
- Pair<Boolean, BigDecimal> parseResult = parseObjectToBigDecimal(value,
schema);
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.toAvroSchema().getLogicalType();
+ Pair<Boolean, BigDecimal> parseResult = parseObjectToBigDecimal(value,
decimalSchema);
if (Boolean.FALSE.equals(parseResult.getLeft())) {
return Pair.of(false, null);
}
BigDecimal bigDecimal = parseResult.getRight();
- switch (schema.getType()) {
- case BYTES:
- // Convert to primitive Arvo type that logical type Decimal uses.
- ByteBuffer byteBuffer = new
Conversions.DecimalConversion().toBytes(bigDecimal, schema, decimalType);
- return Pair.of(true, byteBuffer);
- case FIXED:
- GenericFixed fixedValue = new
Conversions.DecimalConversion().toFixed(bigDecimal, schema, decimalType);
- return Pair.of(true, fixedValue);
- default: {
- return Pair.of(false, null);
- }
+ if (decimalSchema.isFixed()) {
+ GenericFixed fixedValue = new
Conversions.DecimalConversion().toFixed(bigDecimal, schema.toAvroSchema(),
decimalType);
+ return Pair.of(true, fixedValue);
+ } else {
+ // Convert to primitive Arvo type that logical type Decimal uses.
+ ByteBuffer byteBuffer = new
Conversions.DecimalConversion().toBytes(bigDecimal, schema.toAvroSchema(),
decimalType);
+ return Pair.of(true, byteBuffer);
}
}
}
@@ -331,7 +317,7 @@ public class MercifulJsonConverter {
* Convert the given object to Avro object with schema whose logical type
is duration.
*/
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
if (!isValidDurationTypeConfig(schema)) {
return Pair.of(false, null);
@@ -350,7 +336,7 @@ public class MercifulJsonConverter {
for (Integer element : converval) {
buffer.putInt(element); // months
}
- return Pair.of(true, new GenericData.Fixed(schema, buffer.array()));
+ return Pair.of(true, new GenericData.Fixed(schema.toAvroSchema(),
buffer.array()));
}
}
@@ -358,7 +344,7 @@ public class MercifulJsonConverter {
@Override
public Pair<Boolean, Object> convert(
- Object value, String name, Schema schema) {
+ Object value, String name, HoodieSchema schema) {
return convertCommon(
new Parser.IntParser() {
@Override
@@ -382,7 +368,7 @@ public class MercifulJsonConverter {
protected JsonFieldProcessor generateBooleanTypeHandler() {
return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
if (value instanceof Boolean) {
return Pair.of(true, value);
}
@@ -394,7 +380,7 @@ public class MercifulJsonConverter {
protected JsonFieldProcessor generateIntTypeHandler() {
return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
if (value instanceof Number) {
return Pair.of(true, ((Number) value).intValue());
} else if (value instanceof String) {
@@ -408,7 +394,7 @@ public class MercifulJsonConverter {
protected JsonFieldProcessor generateDoubleTypeHandler() {
return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
if (value instanceof Number) {
return Pair.of(true, ((Number) value).doubleValue());
} else if (value instanceof String) {
@@ -422,7 +408,7 @@ public class MercifulJsonConverter {
protected JsonFieldProcessor generateFloatTypeHandler() {
return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
if (value instanceof Number) {
return Pair.of(true, ((Number) value).floatValue());
} else if (value instanceof String) {
@@ -436,7 +422,7 @@ public class MercifulJsonConverter {
protected JsonFieldProcessor generateLongTypeHandler() {
return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
if (value instanceof Number) {
return Pair.of(true, ((Number) value).longValue());
} else if (value instanceof String) {
@@ -455,7 +441,7 @@ public class MercifulJsonConverter {
private static final ObjectMapper STRING_MAPPER = new ObjectMapper();
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
if (value instanceof String) {
return Pair.of(true, value);
} else {
@@ -471,7 +457,7 @@ public class MercifulJsonConverter {
protected JsonFieldProcessor generateBytesTypeHandler() {
return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
// Should return ByteBuffer (see GenericData.isBytes())
return Pair.of(true, ByteBuffer.wrap(value.toString().getBytes()));
}
@@ -484,16 +470,16 @@ public class MercifulJsonConverter {
private static class AvroFixedTypeProcessor extends FixedTypeProcessor {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
return Pair.of(true, new GenericData.Fixed(
- schema, convertToJavaObject(value, name, schema)));
+ schema.toAvroSchema(), convertToJavaObject(value, name, schema)));
}
}
private static class AvroEnumTypeProcessor extends EnumTypeProcessor {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
- return Pair.of(true, new GenericData.EnumSymbol(schema,
convertToJavaObject(value, name, schema)));
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
+ return Pair.of(true, new GenericData.EnumSymbol(schema.toAvroSchema(),
convertToJavaObject(value, name, schema)));
}
}
@@ -504,7 +490,7 @@ public class MercifulJsonConverter {
protected JsonFieldProcessor generateRecordTypeHandler() {
return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
return Pair.of(true, convertJsonToAvro((Map<String, Object>) value,
schema));
}
};
@@ -512,8 +498,8 @@ public class MercifulJsonConverter {
protected JsonFieldProcessor generateArrayTypeHandler() {
return new JsonFieldProcessor() {
- private List<Object> convertToJavaObject(Object value, String name,
Schema schema) {
- Schema elementSchema = schema.getElementType();
+ private List<Object> convertToJavaObject(Object value, String name,
HoodieSchema schema) {
+ HoodieSchema elementSchema = schema.getElementType();
List<Object> listRes = new ArrayList<>();
for (Object v : (List) value) {
listRes.add(convertJsonField(v, name, elementSchema));
@@ -522,9 +508,9 @@ public class MercifulJsonConverter {
}
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
return Pair.of(true, new GenericData.Array<>(
- schema,
+ schema.toAvroSchema(),
convertToJavaObject(
value,
name,
@@ -536,8 +522,8 @@ public class MercifulJsonConverter {
protected JsonFieldProcessor generateMapTypeHandler() {
return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
- Schema valueSchema = schema.getValueType();
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
+ HoodieSchema valueSchema = schema.getValueType();
Map<String, Object> mapRes = new HashMap<>();
for (Map.Entry<String, Object> v : ((Map<String, Object>)
value).entrySet()) {
mapRes.put(v.getKey(), convertJsonField(v.getValue(), name,
valueSchema));
@@ -547,7 +533,7 @@ public class MercifulJsonConverter {
};
}
- protected HoodieJsonToAvroConversionException
buildConversionException(Object value, String fieldName, Schema schema, boolean
shouldSanitize, String invalidCharMask) {
+ protected HoodieJsonToAvroConversionException
buildConversionException(Object value, String fieldName, HoodieSchema schema,
boolean shouldSanitize, String invalidCharMask) {
String errorMsg;
if (shouldSanitize) {
errorMsg = String.format("Json to Avro Type conversion error for field
%s, %s for %s. Field sanitization is enabled with a mask of %s.", fieldName,
value, schema, invalidCharMask);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java
index 93e02dd49836..fc8a6aeea48c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java
@@ -18,11 +18,11 @@
package org.apache.hudi.avro.processors;
-import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
import java.math.BigDecimal;
import java.math.MathContext;
@@ -34,15 +34,15 @@ public abstract class DecimalLogicalTypeProcessor extends
JsonFieldProcessor {
/**
* Check if the given schema is a valid decimal type configuration.
*/
- protected static boolean isValidDecimalTypeConfig(Schema schema) {
- LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ protected static boolean isValidDecimalTypeConfig(HoodieSchema schema) {
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.toAvroSchema().getLogicalType();
// At the time when the schema is found not valid when it is parsed, the
Avro Schema.parse will just silently
// set the schema to be null instead of throwing exceptions.
Correspondingly, we just check if it is null here.
if (decimalType == null) {
return false;
}
// Even though schema is validated at schema parsing phase, still validate
here to be defensive.
- decimalType.validate(schema);
+ decimalType.validate(schema.toAvroSchema());
return true;
}
@@ -53,27 +53,25 @@ public abstract class DecimalLogicalTypeProcessor extends
JsonFieldProcessor {
* @return Pair object, with left as boolean indicating if the parsing was
successful and right as the
* BigDecimal value.
*/
- protected static Pair<Boolean, BigDecimal> parseObjectToBigDecimal(Object
obj, Schema schema) {
+ protected static Pair<Boolean, BigDecimal> parseObjectToBigDecimal(Object
obj, HoodieSchema.Decimal schema) {
BigDecimal bigDecimal = null;
- LogicalTypes.Decimal logicalType = (LogicalTypes.Decimal)
schema.getLogicalType();
try {
if (obj instanceof BigDecimal) {
- bigDecimal = ((BigDecimal) obj).setScale(logicalType.getScale(),
RoundingMode.UNNECESSARY);
+ bigDecimal = ((BigDecimal) obj).setScale(schema.getScale(),
RoundingMode.UNNECESSARY);
} else if (obj instanceof String) {
// Case 2: Object is a number in String format.
try {
//encoded big decimal
- bigDecimal =
HoodieAvroUtils.convertBytesToBigDecimal(decodeStringToBigDecimalBytes(obj),
- (LogicalTypes.Decimal) schema.getLogicalType());
+ bigDecimal =
HoodieSchemaUtils.convertBytesToBigDecimal(decodeStringToBigDecimalBytes(obj),
schema);
} catch (IllegalArgumentException e) {
//no-op
}
}
// None fixed byte or fixed byte conversion failure would end up here.
if (bigDecimal == null) {
- bigDecimal = new BigDecimal(obj.toString(), new
MathContext(logicalType.getPrecision(),
RoundingMode.UNNECESSARY)).setScale(logicalType.getScale(),
RoundingMode.UNNECESSARY);
+ bigDecimal = new BigDecimal(obj.toString(), new
MathContext(schema.getPrecision(),
RoundingMode.UNNECESSARY)).setScale(schema.getScale(),
RoundingMode.UNNECESSARY);
}
- } catch (java.lang.NumberFormatException | ArithmeticException ignored) {
+ } catch (NumberFormatException | ArithmeticException ignored) {
/* ignore */
}
@@ -85,9 +83,8 @@ public abstract class DecimalLogicalTypeProcessor extends
JsonFieldProcessor {
// Allowed: 123.45, 123, 0.12
// Disallowed: 1234 (4 digit integer while the scale has already reserved
2 digit out of the 5 digit precision)
// 123456, 0.12345
- LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
- if (bigDecimal.scale() > decimalType.getScale()
- || (bigDecimal.precision() - bigDecimal.scale()) >
(decimalType.getPrecision() - decimalType.getScale())) {
+ if (bigDecimal.scale() > schema.getScale()
+ || (bigDecimal.precision() - bigDecimal.scale()) >
(schema.getPrecision() - schema.getScale())) {
// Correspond to case
// org.apache.avro.AvroTypeException: Cannot encode decimal with scale 5
as scale 2 without rounding.
// org.apache.avro.AvroTypeException: Cannot encode decimal with scale 3
as scale 2 without rounding
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/DurationLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DurationLogicalTypeProcessor.java
index 47e37db5c502..de0f0605bbd2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/DurationLogicalTypeProcessor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DurationLogicalTypeProcessor.java
@@ -19,9 +19,10 @@
package org.apache.hudi.avro.processors;
import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.avro.LogicalType;
-import org.apache.avro.Schema;
import java.util.List;
@@ -50,15 +51,15 @@ public abstract class DurationLogicalTypeProcessor extends
JsonFieldProcessor {
/**
* Check if the given schema is a valid decimal type configuration.
*/
- protected static boolean isValidDurationTypeConfig(Schema schema) {
+ protected static boolean isValidDurationTypeConfig(HoodieSchema schema) {
String durationTypeName = AvroLogicalTypeEnum.DURATION.getValue();
- LogicalType durationType = schema.getLogicalType();
- String durationTypeProp = schema.getProp("logicalType");
+ LogicalType durationType = schema.toAvroSchema().getLogicalType();
+ String durationTypeProp = (String) schema.getProp("logicalType");
// 1. The Avro type should be "Fixed".
// 2. Fixed size must be of 12 bytes as it hold 3 integers.
// 3. Logical type name should be "duration". The name might be stored in
different places based on Avro version
// being used here.
- return schema.getType().equals(Schema.Type.FIXED)
+ return schema.getType().equals(HoodieSchemaType.FIXED)
&& schema.getFixedSize() == Integer.BYTES *
NUM_ELEMENTS_FOR_DURATION_TYPE
&& (durationType != null &&
durationType.getName().equals(durationTypeName)
|| durationTypeProp != null &&
durationTypeProp.equals(durationTypeName));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/EnumTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/EnumTypeProcessor.java
index d21ace8a8a5f..4920edf7e4c3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/EnumTypeProcessor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/EnumTypeProcessor.java
@@ -18,13 +18,12 @@
package org.apache.hudi.avro.processors;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.exception.HoodieJsonToAvroConversionException;
-import org.apache.avro.Schema;
-
public abstract class EnumTypeProcessor extends JsonFieldProcessor {
- protected Object convertToJavaObject(Object value, String name, Schema
schema) {
+ protected Object convertToJavaObject(Object value, String name, HoodieSchema
schema) {
if (schema.getEnumSymbols().contains(value.toString())) {
return value.toString();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/FixedTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/FixedTypeProcessor.java
index 0604ddad19c1..9050b76fc050 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/FixedTypeProcessor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/FixedTypeProcessor.java
@@ -18,12 +18,12 @@
package org.apache.hudi.avro.processors;
-import org.apache.avro.Schema;
+import org.apache.hudi.common.schema.HoodieSchema;
import java.util.List;
public abstract class FixedTypeProcessor extends JsonFieldProcessor {
- protected byte[] convertToJavaObject(Object value, String name, Schema
schema) {
+ protected byte[] convertToJavaObject(Object value, String name, HoodieSchema
schema) {
// The ObjectMapper use List to represent FixedType
// eg: "decimal_val": [0, 0, 14, -63, -52] will convert to
ArrayList<Integer>
List<Integer> converval = (List<Integer>) value;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/JsonFieldProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/JsonFieldProcessor.java
index 519e129e151a..3fffa03b6843 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/JsonFieldProcessor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/JsonFieldProcessor.java
@@ -18,16 +18,15 @@
package org.apache.hudi.avro.processors;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieJsonToAvroConversionException;
-import org.apache.avro.Schema;
-
import java.io.Serializable;
public abstract class JsonFieldProcessor implements Serializable {
- public Object convertField(Object value, String name, Schema schema) {
+ public Object convertField(Object value, String name, HoodieSchema schema) {
Pair<Boolean, Object> res = convert(value, name, schema);
if (!res.getLeft()) {
throw new HoodieJsonToAvroConversionException("failed to convert json to
avro");
@@ -35,5 +34,5 @@ public abstract class JsonFieldProcessor implements
Serializable {
return res.getRight();
}
- public abstract Pair<Boolean, Object> convert(Object value, String name,
Schema schema);
+ public abstract Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMicroLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMicroLogicalTypeProcessor.java
index 20ea08de9d8f..616657d4492c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMicroLogicalTypeProcessor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMicroLogicalTypeProcessor.java
@@ -19,10 +19,9 @@
package org.apache.hudi.avro.processors;
import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.avro.Schema;
-
import java.time.LocalDateTime;
import java.time.temporal.ChronoField;
@@ -36,7 +35,7 @@ public class LocalTimestampMicroLogicalTypeProcessor extends
TimeLogicalTypeProc
@Override
public Pair<Boolean, Object> convert(
- Object value, String name, Schema schema) {
+ Object value, String name, HoodieSchema schema) {
return convertCommon(
new Parser.LongParser() {
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMilliLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMilliLogicalTypeProcessor.java
index 62753daf4844..d69374002a68 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMilliLogicalTypeProcessor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMilliLogicalTypeProcessor.java
@@ -19,10 +19,9 @@
package org.apache.hudi.avro.processors;
import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.avro.Schema;
-
import java.time.LocalDateTime;
import java.time.temporal.ChronoField;
@@ -33,7 +32,7 @@ public class LocalTimestampMilliLogicalTypeProcessor extends
TimeLogicalTypeProc
@Override
public Pair<Boolean, Object> convert(
- Object value, String name, Schema schema) {
+ Object value, String name, HoodieSchema schema) {
return convertCommon(
new Parser.LongParser() {
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeLogicalTypeProcessor.java
index 9cb3b20fb0b8..467a4c36f313 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeLogicalTypeProcessor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeLogicalTypeProcessor.java
@@ -19,10 +19,10 @@
package org.apache.hudi.avro.processors;
import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.LogicalType;
-import org.apache.avro.Schema;
import java.time.Instant;
import java.time.LocalDateTime;
@@ -60,12 +60,12 @@ public abstract class TimeLogicalTypeProcessor extends
JsonFieldProcessor {
/**
* Main function that convert input to Object with java data type specified
by schema
*/
- public Pair<Boolean, Object> convertCommon(Parser parser, Object value,
Schema schema) {
- LogicalType logicalType = schema.getLogicalType();
+ public Pair<Boolean, Object> convertCommon(Parser parser, Object value,
HoodieSchema schema) {
+ LogicalType logicalType = schema.toAvroSchema().getLogicalType();
if (logicalType == null) {
return Pair.of(false, null);
}
- logicalType.validate(schema);
+
if (value instanceof Number) {
return parser.handleNumberValue((Number) value);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMicroLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMicroLogicalTypeProcessor.java
index 75a011ed28eb..80e3063d5e22 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMicroLogicalTypeProcessor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMicroLogicalTypeProcessor.java
@@ -19,10 +19,9 @@
package org.apache.hudi.avro.processors;
import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.avro.Schema;
-
/**
* Processor for TimeMicro logical type.
*/
@@ -33,7 +32,7 @@ public class TimeMicroLogicalTypeProcessor extends
TimeLogicalTypeProcessor {
@Override
public Pair<Boolean, Object> convert(
- Object value, String name, Schema schema) {
+ Object value, String name, HoodieSchema schema) {
return convertCommon(
new Parser.LongParser() {
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMilliLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMilliLogicalTypeProcessor.java
index f6f2ed2c2bfe..d23eccd74ba1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMilliLogicalTypeProcessor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMilliLogicalTypeProcessor.java
@@ -19,10 +19,9 @@
package org.apache.hudi.avro.processors;
import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.avro.Schema;
-
/**
* Processor for TimeMilli logical type.
*/
@@ -33,7 +32,7 @@ public class TimeMilliLogicalTypeProcessor extends
TimeLogicalTypeProcessor {
@Override
public Pair<Boolean, Object> convert(
- Object value, String name, Schema schema) {
+ Object value, String name, HoodieSchema schema) {
return convertCommon(
new Parser.IntParser() {
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMicroLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMicroLogicalTypeProcessor.java
index b1cf73533827..d8472e1397df 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMicroLogicalTypeProcessor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMicroLogicalTypeProcessor.java
@@ -19,10 +19,9 @@
package org.apache.hudi.avro.processors;
import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.avro.Schema;
-
import java.time.Instant;
import java.time.temporal.ChronoField;
@@ -36,7 +35,7 @@ public class TimestampMicroLogicalTypeProcessor extends
TimeLogicalTypeProcessor
@Override
public Pair<Boolean, Object> convert(
- Object value, String name, Schema schema) {
+ Object value, String name, HoodieSchema schema) {
return convertCommon(
new Parser.LongParser() {
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMilliLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMilliLogicalTypeProcessor.java
index 5c5493f38b3a..780ec332a3f5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMilliLogicalTypeProcessor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMilliLogicalTypeProcessor.java
@@ -19,10 +19,9 @@
package org.apache.hudi.avro.processors;
import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.avro.Schema;
-
import java.time.Instant;
import java.time.temporal.ChronoField;
@@ -36,7 +35,7 @@ public class TimestampMilliLogicalTypeProcessor extends
TimeLogicalTypeProcessor
@Override
public Pair<Boolean, Object> convert(
- Object value, String name, Schema schema) {
+ Object value, String name, HoodieSchema schema) {
return convertCommon(
new Parser.LongParser() {
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java
index 0fb36c4648bf..e6667384f582 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common;
import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.io.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
@@ -64,7 +65,7 @@ public class HoodieJsonPayload implements
HoodieRecordPayload<HoodieJsonPayload>
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws
IOException {
MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
- return Option.of(jsonConverter.convert(getJsonData(), schema));
+ return Option.of(jsonConverter.convert(getJsonData(),
HoodieSchema.fromAvroSchema(schema)));
}
private String getJsonData() throws IOException {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/LocalAvroSchemaCache.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/LocalAvroSchemaCache.java
deleted file mode 100644
index fd55f8981e1b..000000000000
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/LocalAvroSchemaCache.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.common.util;
-
-import org.apache.avro.Schema;
-
-import javax.annotation.concurrent.NotThreadSafe;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * An avro schema cache implementation for managing different version of
schemas.
- * This is a local cache; the versionId only works for local thread in one
container/executor.
- * A map of {version_id, schema} is maintained.
- */
-@NotThreadSafe
-@Deprecated
-public class LocalAvroSchemaCache implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final Map<Integer, Schema> versionIdToSchema; // the mapping from
version_id -> schema
- private final Map<Schema, Integer> schemaToVersionId; // the mapping from
schema -> version_id
-
- private int nextVersionId = 0;
-
- private LocalAvroSchemaCache() {
- this.versionIdToSchema = new HashMap<>();
- this.schemaToVersionId = new HashMap<>();
- }
-
- public static LocalAvroSchemaCache getInstance() {
- return new LocalAvroSchemaCache();
- }
-
- public Integer cacheSchema(Schema schema) {
- Integer versionId = this.schemaToVersionId.get(schema);
- if (versionId == null) {
- versionId = nextVersionId++;
- this.schemaToVersionId.put(schema, versionId);
- this.versionIdToSchema.put(versionId, schema);
- }
- return versionId;
- }
-
- public Option<Schema> getSchema(Integer versionId) {
- return Option.ofNullable(this.versionIdToSchema.get(versionId));
- }
-}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
index d89b7f197685..9783ecac1b5a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
+++
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
@@ -31,9 +31,6 @@ import org.apache.hudi.internal.schema.Type;
import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -504,25 +501,25 @@ public class InternalSchemaConverter {
return HoodieSchema.create(HoodieSchemaType.DOUBLE);
case DATE:
- return
HoodieSchema.fromAvroSchema(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)));
+ return HoodieSchema.createDate();
case TIME:
- return
HoodieSchema.fromAvroSchema(LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)));
+ return HoodieSchema.createTimeMicros();
case TIME_MILLIS:
- return
HoodieSchema.fromAvroSchema(LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT)));
+ return HoodieSchema.createTimeMillis();
case TIMESTAMP:
- return
HoodieSchema.fromAvroSchema(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)));
+ return HoodieSchema.createTimestampMicros();
case TIMESTAMP_MILLIS:
- return
HoodieSchema.fromAvroSchema(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)));
+ return HoodieSchema.createTimestampMillis();
case LOCAL_TIMESTAMP_MICROS:
- return
HoodieSchema.fromAvroSchema(LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)));
+ return HoodieSchema.createLocalTimestampMicros();
case LOCAL_TIMESTAMP_MILLIS:
- return
HoodieSchema.fromAvroSchema(LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG)));
+ return HoodieSchema.createLocalTimestampMillis();
case STRING:
return HoodieSchema.create(HoodieSchemaType.STRING);
@@ -530,13 +527,8 @@ public class InternalSchemaConverter {
case BINARY:
return HoodieSchema.create(HoodieSchemaType.BYTES);
- case UUID: {
- // NOTE: All schemas corresponding to Hoodie's type [[FIXED]] are
generated
- // with the "fixed" name to stay compatible w/
[[SchemaConverters]]
- String name = recordName + FIELD_NAME_DELIMITER + "fixed";
- Schema fixedSchema = Schema.createFixed(name, null, null, 16);
- return
HoodieSchema.fromAvroSchema(LogicalTypes.uuid().addToSchema(fixedSchema));
- }
+ case UUID:
+ return HoodieSchema.createUUID();
case FIXED: {
Types.FixedType fixed = (Types.FixedType) primitive;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
index 6b7687133f5b..175da0b2c712 100644
---
a/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
+++
b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
@@ -71,7 +71,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
rec.put("favorite_number", number);
rec.put("favorite_color", color);
- assertEquals(rec, CONVERTER.convert(json, simpleSchema.toAvroSchema()));
+ assertEquals(rec, CONVERTER.convert(json, simpleSchema));
}
@ParameterizedTest
@@ -85,7 +85,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
rec.put("favorite_number", 1337);
rec.put("favorite_color", "10");
- assertEquals(rec, CONVERTER.convert(json, simpleSchema.toAvroSchema()));
+ assertEquals(rec, CONVERTER.convert(json, simpleSchema));
}
/**
@@ -113,7 +113,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
// Schedule with timestamp same as that of committed instant
assertThrows(HoodieJsonToAvroConversionException.class, () -> {
- CONVERTER.convert(json, schema.toAvroSchema());
+ CONVERTER.convert(json, schema);
});
}
@@ -166,7 +166,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
String json = MAPPER.writeValueAsString(data);
- GenericRecord real = CONVERTER.convert(json, schema.toAvroSchema());
+ GenericRecord real = CONVERTER.convert(json, schema);
assertEquals(record, real);
}
@@ -184,10 +184,10 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
HoodieSchema decimalFieldSchema =
schema.getField("decimalField").get().schema();
record.put("decimalField", conv.toBytes(new BigDecimal(expected),
decimalFieldSchema.toAvroSchema(),
decimalFieldSchema.toAvroSchema().getLogicalType()));
- GenericRecord real = CONVERTER.convert(json, schema.toAvroSchema());
+ GenericRecord real = CONVERTER.convert(json, schema);
assertEquals(record, real);
} else {
- assertThrows(HoodieJsonToAvroConversionException.class, () ->
CONVERTER.convert(json, schema.toAvroSchema()));
+ assertThrows(HoodieJsonToAvroConversionException.class, () ->
CONVERTER.convert(json, schema));
}
}
@@ -219,7 +219,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
GenericRecord durationRecord = new
GenericData.Record(schema.toAvroSchema());
durationRecord.put("duration", new
GenericData.Fixed(schema.getField("duration").get().schema().toAvroSchema(),
buffer.array()));
- GenericRecord real = CONVERTER.convert(json, schema.toAvroSchema());
+ GenericRecord real = CONVERTER.convert(json, schema);
assertEquals(durationRecord, real);
}
@@ -233,7 +233,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
HoodieSchema schema =
SchemaTestUtil.getSchemaFromResourceFilePath(schemaFile);
// Schedule with timestamp same as that of committed instant
assertThrows(HoodieJsonToAvroConversionException.class, () -> {
- CONVERTER.convert(json, schema.toAvroSchema());
+ CONVERTER.convert(json, schema);
});
}
@@ -256,7 +256,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
Map<String, Object> data = new HashMap<>();
data.put("dateField", dateInput);
String json = MAPPER.writeValueAsString(data);
- GenericRecord real = CONVERTER.convert(json, schema.toAvroSchema());
+ GenericRecord real = CONVERTER.convert(json, schema);
assertEquals(record, real);
}
@@ -276,7 +276,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
data.put("dateField", input);
String json = MAPPER.writeValueAsString(data);
assertThrows(HoodieJsonToAvroConversionException.class, () -> {
- CONVERTER.convert(json, schema.toAvroSchema());
+ CONVERTER.convert(json, schema);
});
}
@@ -306,7 +306,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
data.put("localTimestampMillisField", timeMilli);
data.put("localTimestampMicrosField", timeMicro);
String json = MAPPER.writeValueAsString(data);
- GenericRecord real = CONVERTER.convert(json, schema.toAvroSchema());
+ GenericRecord real = CONVERTER.convert(json, schema);
assertEquals(record, real);
}
@@ -321,7 +321,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
String json = MAPPER.writeValueAsString(data);
// Schedule with timestamp same as that of committed instant
assertThrows(HoodieJsonToAvroConversionException.class, () -> {
- CONVERTER.convert(json, schema.toAvroSchema());
+ CONVERTER.convert(json, schema);
});
}
@@ -350,7 +350,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
data.put("timestampMillisField", timeMilli);
data.put("timestampMicrosField", timeMicro);
String json = MAPPER.writeValueAsString(data);
- GenericRecord real = CONVERTER.convert(json, schema.toAvroSchema());
+ GenericRecord real = CONVERTER.convert(json, schema);
assertEquals(record, real);
}
@@ -367,7 +367,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
data.put("timestampMicrosField", badInput);
// Schedule with timestamp same as that of committed instant
assertThrows(HoodieJsonToAvroConversionException.class, () -> {
- CONVERTER.convert(MAPPER.writeValueAsString(data),
schema.toAvroSchema());
+ CONVERTER.convert(MAPPER.writeValueAsString(data), schema);
});
data.clear();
@@ -375,7 +375,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
data.put("timestampMicrosField", validInput);
// Schedule with timestamp same as that of committed instant
assertThrows(HoodieJsonToAvroConversionException.class, () -> {
- CONVERTER.convert(MAPPER.writeValueAsString(data),
schema.toAvroSchema());
+ CONVERTER.convert(MAPPER.writeValueAsString(data), schema);
});
}
@@ -404,7 +404,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
data.put("timeMicroField", timeMicro);
data.put("timeMillisField", timeMilli);
String json = MAPPER.writeValueAsString(data);
- GenericRecord real = CONVERTER.convert(json, schema.toAvroSchema());
+ GenericRecord real = CONVERTER.convert(json, schema);
assertEquals(record, real);
}
@@ -421,7 +421,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
data.put("timeMillisField", invalidInput);
// Schedule with timestamp same as that of committed instant
assertThrows(HoodieJsonToAvroConversionException.class, () -> {
- CONVERTER.convert(MAPPER.writeValueAsString(data),
schema.toAvroSchema());
+ CONVERTER.convert(MAPPER.writeValueAsString(data), schema);
});
data.clear();
@@ -429,7 +429,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
data.put("timeMillisField", validInput);
// Schedule with timestamp same as that of committed instant
assertThrows(HoodieJsonToAvroConversionException.class, () -> {
- CONVERTER.convert(MAPPER.writeValueAsString(data),
schema.toAvroSchema());
+ CONVERTER.convert(MAPPER.writeValueAsString(data), schema);
});
}
@@ -452,7 +452,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
Map<String, Object> data = new HashMap<>();
data.put("uuidField", uuid);
String json = MAPPER.writeValueAsString(data);
- GenericRecord real = CONVERTER.convert(json, schema.toAvroSchema());
+ GenericRecord real = CONVERTER.convert(json, schema);
assertEquals(record, real);
}
@@ -480,7 +480,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
userRecord.put("name", "Jane Smith");
userRecord.put("contact", contactRecord);
- assertEquals(userRecord, CONVERTER.convert(json,
nestedSchema.toAvroSchema()));
+ assertEquals(userRecord, CONVERTER.convert(json, nestedSchema));
}
@Test
@@ -502,7 +502,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
rec.put("favorite__number", number);
rec.put("favorite__color__", color);
- assertEquals(rec, CONVERTER.convert(json, sanitizedSchema.toAvroSchema()));
+ assertEquals(rec, CONVERTER.convert(json, sanitizedSchema));
}
@Test
@@ -525,7 +525,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
rec.put("favorite_number", number);
rec.put("favorite_color", color);
- assertEquals(rec, CONVERTER.convert(json, sanitizedSchema.toAvroSchema()));
+ assertEquals(rec, CONVERTER.convert(json, sanitizedSchema));
}
@ParameterizedTest
@@ -546,7 +546,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
String json = MAPPER.writeValueAsString(data);
HoodieSchema tripSchema = HoodieSchema.parse(
TRIP_ENCODED_DECIMAL_SCHEMA.replace("6",
Integer.toString(scale)).replace("10", Integer.toString(precision)));
- GenericRecord genrec = CONVERTER.convert(json, tripSchema.toAvroSchema());
+ GenericRecord genrec = CONVERTER.convert(json, tripSchema);
HoodieSchema decimalFieldSchema =
tripSchema.getField("decfield").get().schema();
BigDecimal decoded =
HoodieAvroUtils.convertBytesToBigDecimal(((ByteBuffer)
genrec.get("decfield")).array(),
(LogicalTypes.Decimal)
decimalFieldSchema.toAvroSchema().getLogicalType());
@@ -578,7 +578,7 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
data.put("fare", rand.nextDouble() * 100);
data.put("_hoodie_is_deleted", false);
String json = MAPPER.writeValueAsString(data);
- GenericRecord genrec = CONVERTER.convert(json,
postProcessSchema.toAvroSchema());
+ GenericRecord genrec = CONVERTER.convert(json, postProcessSchema);
GenericData.Fixed fixed = (GenericData.Fixed) genrec.get("decfield");
Conversions.DecimalConversion decimalConverter = new
Conversions.DecimalConversion();
HoodieSchema decimalFieldSchema =
postProcessSchema.getField("decfield").get().schema();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
index 4f21aa571479..fd8f1d2b349a 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.testutils;
import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.io.util.FileIOUtils;
@@ -89,7 +90,7 @@ public class RawTripTestPayload implements
HoodieRecordPayload<RawTripTestPayloa
if (isDeleted) {
return Option.empty();
} else {
- return Option.of(JSON_CONVERTER.convert(getJsonData(), schema));
+ return Option.of(JSON_CONVERTER.convert(getJsonData(),
HoodieSchema.fromAvroSchema(schema)));
}
}
@@ -98,10 +99,6 @@ public class RawTripTestPayload implements
HoodieRecordPayload<RawTripTestPayloa
return orderingVal;
}
- public IndexedRecord getRecordToInsert(Schema schema) throws IOException {
- return JSON_CONVERTER.convert(getJsonData(), schema);
- }
-
@Override
public Option<Map<String, String>> getMetadata() {
// Let's assume we want to count the number of input row change events
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
index 3a714c6361ad..6e10245f73e6 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
@@ -318,7 +318,7 @@ public final class SchemaTestUtil {
public static GenericRecord generateAvroRecordFromJson(HoodieSchema schema,
int recordNumber, String instantTime,
String fileId, boolean populateMetaFields) throws IOException {
SampleTestRecord record = new SampleTestRecord(instantTime, recordNumber,
fileId, populateMetaFields);
- return CONVERTER.convert(record.toJsonString(), schema.toAvroSchema());
+ return CONVERTER.convert(record.toJsonString(), schema);
}
public static HoodieSchema getSchemaFromResource(Class<?> clazz, String
name, boolean withHoodieMetadata) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestLocalAvroSchemaCache.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestLocalAvroSchemaCache.java
deleted file mode 100644
index 74c1801f47e5..000000000000
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestLocalAvroSchemaCache.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.common.util;
-
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-
-import org.apache.avro.Schema;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class TestLocalAvroSchemaCache {
-
- @Test
- public void testBasicCacheUsage() {
- LocalAvroSchemaCache localAvroSchemaCache =
LocalAvroSchemaCache.getInstance();
- Integer avroSchemaCacheNum =
localAvroSchemaCache.cacheSchema(HoodieTestDataGenerator.AVRO_SCHEMA);
- Integer avroTripSchemaCacheNum =
localAvroSchemaCache.cacheSchema(HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
- Integer flatAvroSchemaCacheNum =
localAvroSchemaCache.cacheSchema(HoodieTestDataGenerator.FLATTENED_AVRO_SCHEMA);
- Integer nestAvroSchemaCacheNum =
localAvroSchemaCache.cacheSchema(HoodieTestDataGenerator.NESTED_AVRO_SCHEMA);
- Set<Integer> uniqueSet = new HashSet<>(Arrays.asList(avroSchemaCacheNum,
avroTripSchemaCacheNum, flatAvroSchemaCacheNum, nestAvroSchemaCacheNum));
- assertEquals(4, uniqueSet.size());
- assertTrue(localAvroSchemaCache.getSchema(avroSchemaCacheNum).isPresent());
- assertEquals(HoodieTestDataGenerator.AVRO_SCHEMA,
localAvroSchemaCache.getSchema(avroSchemaCacheNum).get());
-
assertTrue(localAvroSchemaCache.getSchema(avroTripSchemaCacheNum).isPresent());
- assertEquals(HoodieTestDataGenerator.AVRO_TRIP_SCHEMA,
localAvroSchemaCache.getSchema(avroTripSchemaCacheNum).get());
-
assertTrue(localAvroSchemaCache.getSchema(flatAvroSchemaCacheNum).isPresent());
- assertEquals(HoodieTestDataGenerator.FLATTENED_AVRO_SCHEMA,
localAvroSchemaCache.getSchema(flatAvroSchemaCacheNum).get());
-
assertTrue(localAvroSchemaCache.getSchema(nestAvroSchemaCacheNum).isPresent());
- assertEquals(HoodieTestDataGenerator.NESTED_AVRO_SCHEMA,
localAvroSchemaCache.getSchema(nestAvroSchemaCacheNum).get());
- }
-
- @Test
- public void testCopiesOfSameSchema() {
- LocalAvroSchemaCache localAvroSchemaCache =
LocalAvroSchemaCache.getInstance();
- Schema testSchema1 = new
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
- Schema testSchema2 = new
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
- Integer cachenum = localAvroSchemaCache.cacheSchema(testSchema1);
- Integer secondSchemaCacheNum =
localAvroSchemaCache.cacheSchema(testSchema2);
- assertEquals(cachenum, secondSchemaCacheNum);
- assertTrue(localAvroSchemaCache.getSchema(cachenum).isPresent());
- assertEquals(testSchema1, localAvroSchemaCache.getSchema(cachenum).get());
- }
-}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
index 6a4e9dbb506d..662436de9d75 100644
---
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
@@ -80,10 +80,11 @@ public class TestAvroSchemaEvolutionUtils {
HoodieSchema.fromAvroSchema(LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG))),
HoodieSchema.fromAvroSchema(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))),
HoodieSchema.create(HoodieSchemaType.STRING),
-
HoodieSchema.fromAvroSchema(LogicalTypes.uuid().addToSchema(Schema.createFixed("t1.fixed",
null, null, 16))),
+ HoodieSchema.createUUID(),
HoodieSchema.createFixed("t1.fixed", null, null, 12),
HoodieSchema.create(HoodieSchemaType.BYTES),
- HoodieSchema.fromAvroSchema(LogicalTypes.decimal(9,
4).addToSchema(Schema.createFixed("t1.fixed", null, null, 4)))};
+ HoodieSchema.createDecimal("t1.fixed", null, null, 9, 4, 4)
+ };
Type[] primitiveTypes = new Type[] {
Types.BooleanType.get(),
@@ -140,8 +141,7 @@ public class TestAvroSchemaEvolutionUtils {
HoodieSchemaField.of("timestamp",
HoodieSchema.createNullable(HoodieSchema.fromAvroSchema(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)))),
null, HoodieJsonProperties.NULL_VALUE),
HoodieSchemaField.of("string",
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING)),
null, HoodieJsonProperties.NULL_VALUE),
- HoodieSchemaField.of("uuid",
HoodieSchema.createNullable(HoodieSchema.fromAvroSchema(LogicalTypes.uuid().addToSchema(
- Schema.createFixed("t1.uuid.fixed", null, null, 16)))), null,
HoodieJsonProperties.NULL_VALUE),
+ HoodieSchemaField.of("uuid",
HoodieSchema.createNullable(HoodieSchema.createUUID()), null,
HoodieJsonProperties.NULL_VALUE),
HoodieSchemaField.of("fixed",
HoodieSchema.createNullable(HoodieSchema.createFixed("t1.fixed.fixed", null,
null, 10)), null, HoodieJsonProperties.NULL_VALUE),
HoodieSchemaField.of("binary",
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.BYTES)), null,
HoodieJsonProperties.NULL_VALUE),
HoodieSchemaField.of("decimal",
HoodieSchema.createNullable(HoodieSchema.fromAvroSchema(LogicalTypes.decimal(10,
2)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index b124a60c2eda..4ab4f4bc03d2 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -117,7 +117,7 @@ public class AvroConvertor implements Serializable {
try {
initSchema();
initJsonConvertor();
- return jsonConverter.convert(json, schema.toAvroSchema());
+ return jsonConverter.convert(json, schema);
} catch (Exception e) {
String errorMessage = "Failed to convert JSON string to Avro record: ";
if (json != null) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
index a73870f60a16..031850073074 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
@@ -31,6 +31,7 @@ import org.apache.hudi.avro.processors.Parser;
import org.apache.hudi.avro.processors.TimestampMicroLogicalTypeProcessor;
import org.apache.hudi.avro.processors.TimestampMilliLogicalTypeProcessor;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -42,7 +43,6 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.spark.sql.Row;
@@ -100,12 +100,11 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
}
private Row convertJsonToRow(Map<String, Object> inputJson, HoodieSchema
schema) {
- Schema avroSchema = schema.toAvroSchema();
- List<Schema.Field> fields = avroSchema.getFields();
+ List<HoodieSchemaField> fields = schema.getFields();
List<Object> values = new ArrayList<>(Collections.nCopies(fields.size(),
null));
- for (Schema.Field f : fields) {
- Object val = shouldSanitize ? getFieldFromJson(f, inputJson,
avroSchema.getFullName(), invalidCharMask) : inputJson.get(f.name());
+ for (HoodieSchemaField f : fields) {
+ Object val = shouldSanitize ? getFieldFromJson(f, inputJson,
schema.getFullName(), invalidCharMask) : inputJson.get(f.name());
if (val != null) {
values.set(f.pos(),
SparkValueMetadataUtils.convertJavaTypeToSparkType(convertJsonField(val,
f.name(), f.schema()), useJava8api));
}
@@ -115,33 +114,35 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
private class DecimalToRowLogicalTypeProcessor extends
DecimalLogicalTypeProcessor {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
if (!isValidDecimalTypeConfig(schema)) {
return Pair.of(false, null);
}
- if (schema.getType() == Type.FIXED && value instanceof List<?>) {
+ HoodieSchema.Decimal decimalSchema = (HoodieSchema.Decimal) schema;
+ if (decimalSchema.isFixed() && value instanceof List<?>) {
// Case 1: Input is a list. It is expected to be raw Fixed byte array
input, and we only support
// parsing it to Fixed type.
JsonFieldProcessor processor = generateFixedTypeHandler();
Pair<Boolean, Object> fixedTypeResult = processor.convert(value, name,
schema);
if (fixedTypeResult.getLeft()) {
byte[] byteArray = (byte[]) fixedTypeResult.getRight();
- GenericFixed fixedValue = new GenericData.Fixed(schema, byteArray);
+ Schema avroSchema = schema.toAvroSchema();
+ GenericFixed fixedValue = new GenericData.Fixed(avroSchema,
byteArray);
// Convert the GenericFixed to BigDecimal
return Pair.of(true, new Conversions
.DecimalConversion()
.fromFixed(
fixedValue,
- schema,
- schema.getLogicalType()
+ avroSchema,
+ avroSchema.getLogicalType()
)
);
}
}
// Case 2: Input is a number or String number or base64 encoded string
number
- Pair<Boolean, BigDecimal> parseResult = parseObjectToBigDecimal(value,
schema);
+ Pair<Boolean, BigDecimal> parseResult = parseObjectToBigDecimal(value,
decimalSchema);
return Pair.of(parseResult.getLeft(), parseResult.getRight());
}
}
@@ -165,7 +166,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
@Override
public Pair<Boolean, Object> convert(
- Object value, String name, Schema schema) {
+ Object value, String name, HoodieSchema schema) {
throw new HoodieJsonToRowConversionException("Duration type is not
supported in Row object");
}
}
@@ -173,7 +174,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
private static class DateToRowLogicalTypeProcessor extends
DateLogicalTypeProcessor {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
return convertCommon(new Parser.DateParser(), value, schema);
}
}
@@ -182,7 +183,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
protected JsonFieldProcessor generateBytesTypeHandler() {
return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
return Pair.of(true, value.toString().getBytes());
}
};
@@ -195,7 +196,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
private static class FixedToRowTypeProcessor extends FixedTypeProcessor {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
return Pair.of(true, convertToJavaObject(value, name, schema));
}
}
@@ -207,7 +208,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
private static class EnumToRowTypeProcessor extends EnumTypeProcessor {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
return Pair.of(true, convertToJavaObject(value, name, schema));
}
}
@@ -216,8 +217,8 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
protected JsonFieldProcessor generateRecordTypeHandler() {
return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
- return Pair.of(true, convertJsonToRow((Map<String, Object>) value,
HoodieSchema.fromAvroSchema(schema)));
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
+ return Pair.of(true, convertJsonToRow((Map<String, Object>) value,
schema));
}
};
}
@@ -230,7 +231,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
private static class TimestampMilliToRowLogicalTypeProcessor extends
TimestampMilliLogicalTypeProcessor {
@Override
public Pair<Boolean, Object> convert(
- Object value, String name, Schema schema) {
+ Object value, String name, HoodieSchema schema) {
Pair<Boolean, Object> result = convertCommon(
new Parser.LongParser() {
@Override
@@ -267,7 +268,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
private static class TimestampMicroToRowLogicalTypeProcessor extends
TimestampMicroLogicalTypeProcessor {
@Override
public Pair<Boolean, Object> convert(
- Object value, String name, Schema schema) {
+ Object value, String name, HoodieSchema schema) {
Pair<Boolean, Object> result = convertCommon(
new Parser.LongParser() {
@Override
@@ -290,7 +291,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
@Override
public Pair<Boolean, Object> convert(
- Object value, String name, Schema schema) {
+ Object value, String name, HoodieSchema schema) {
return convertCommon(
new Parser.LongParser() {
@Override
@@ -330,7 +331,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
private static class LocalTimestampMilliToRowLogicalTypeProcessor extends
LocalTimestampMilliLogicalTypeProcessor {
@Override
public Pair<Boolean, Object> convert(
- Object value, String name, Schema schema) {
+ Object value, String name, HoodieSchema schema) {
return convertCommon(
new Parser.LongParser() {
@Override
@@ -370,8 +371,8 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
@Override
protected JsonFieldProcessor generateArrayTypeHandler() {
return new JsonFieldProcessor() {
- private List<Object> convertToJavaObject(Object value, String name,
Schema schema) {
- Schema elementSchema = schema.getElementType();
+ private List<Object> convertToJavaObject(Object value, String name,
HoodieSchema schema) {
+ HoodieSchema elementSchema = schema.getElementType();
List<Object> listRes = new ArrayList<>();
for (Object v : (List) value) {
listRes.add(convertJsonField(v, name, elementSchema));
@@ -380,7 +381,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
}
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
return Pair.of(true,
convertToJavaObject(
value,
@@ -396,8 +397,8 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
public Map<String, Object> convertToJavaObject(
Object value,
String name,
- Schema schema) {
- Schema valueSchema = schema.getValueType();
+ HoodieSchema schema) {
+ HoodieSchema valueSchema = schema.getValueType();
Map<String, Object> mapRes = new HashMap<>();
for (Map.Entry<String, Object> v : ((Map<String, Object>)
value).entrySet()) {
mapRes.put(v.getKey(), convertJsonField(v.getValue(), name,
valueSchema));
@@ -406,7 +407,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
}
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
return Pair.of(true, JavaConverters
.mapAsScalaMapConverter(
convertToJavaObject(
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java
index 85143a2cf367..13b305317707 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchema.TimePrecision;
+import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -42,7 +43,6 @@ import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
import com.google.protobuf.util.Timestamps;
import org.apache.avro.Conversions;
-import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -81,8 +81,7 @@ public class ProtoConversionUtil {
* @return A HoodieSchema
*/
public static HoodieSchema getSchemaForMessageClass(Class clazz,
SchemaConfig schemaConfig) {
- Schema avroSchema = new AvroSupport(schemaConfig).getSchema(clazz);
- return HoodieSchema.fromAvroSchema(avroSchema);
+ return new AvroSupport(schemaConfig).getSchema(clazz);
}
/**
@@ -150,22 +149,21 @@ public class ProtoConversionUtil {
* 2. Convert directly from a protobuf {@link Message} to a {@link
GenericRecord} while properly handling enums and wrapped primitives mentioned
above.
*/
private static class AvroSupport {
- private static final Schema STRING_SCHEMA =
Schema.create(Schema.Type.STRING);
- private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
+ private static final HoodieSchema STRING_SCHEMA =
HoodieSchema.create(HoodieSchemaType.STRING);
// The max unsigned long value has 20 digits, so a decimal with precision
20 and scale 0 is required to represent all possible values.
// A byte array of length N can store at most floor(log_10(2^(8 × N - 1) -
1)) base 10 digits so we require N = 9.
- private static final Schema UNSIGNED_LONG_SCHEMA =
LogicalTypes.decimal(20).addToSchema(Schema.createFixed("unsigned_long", null,
"org.apache.hudi.protos", 9));
+ private static final HoodieSchema UNSIGNED_LONG_SCHEMA =
HoodieSchema.createDecimal("unsigned_long", "org.apache.hudi.protos", null, 20,
0, 9);
private static final Conversions.DecimalConversion DECIMAL_CONVERSION =
new Conversions.DecimalConversion();
private static final String OVERFLOW_DESCRIPTOR_FIELD_NAME =
"descriptor_full_name";
private static final String OVERFLOW_BYTES_FIELD_NAME = "proto_bytes";
- private static final Schema RECURSION_OVERFLOW_SCHEMA =
Schema.createRecord("recursion_overflow", null, "org.apache.hudi.proto", false,
- Arrays.asList(new Schema.Field(OVERFLOW_DESCRIPTOR_FIELD_NAME,
STRING_SCHEMA, null, ""),
- new Schema.Field(OVERFLOW_BYTES_FIELD_NAME,
Schema.create(Schema.Type.BYTES), null, getUTF8Bytes(""))));
+ private static final HoodieSchema RECURSION_OVERFLOW_SCHEMA =
HoodieSchema.createRecord("recursion_overflow", null, "org.apache.hudi.proto",
false,
+ Arrays.asList(HoodieSchemaField.of(OVERFLOW_DESCRIPTOR_FIELD_NAME,
STRING_SCHEMA, null, ""),
+ HoodieSchemaField.of(OVERFLOW_BYTES_FIELD_NAME,
HoodieSchema.create(HoodieSchemaType.BYTES), null, getUTF8Bytes(""))));
// A cache of the proto class name paired with whether wrapped primitives
should be flattened as the key and the generated avro schema as the value
- private static final Map<SchemaCacheKey, Schema> SCHEMA_CACHE = new
ConcurrentHashMap<>();
+ private static final Map<SchemaCacheKey, HoodieSchema> SCHEMA_CACHE = new
ConcurrentHashMap<>();
// A cache with a key as the pair target avro schema and the proto
descriptor for the source and the value as an array of proto field descriptors
where the order matches the avro ordering.
// When converting from proto to avro, we want to be able to iterate over
the fields in the proto in the same order as they appear in the avro schema.
- private static final Map<Pair<Schema, Descriptors.Descriptor>,
Descriptors.FieldDescriptor[]> FIELD_CACHE = new ConcurrentHashMap<>();
+ private static final Map<Pair<HoodieSchema, Descriptors.Descriptor>,
Descriptors.FieldDescriptor[]> FIELD_CACHE = new ConcurrentHashMap<>();
private static final Set<Descriptors.Descriptor>
WRAPPER_DESCRIPTORS_TO_TYPE = CollectionUtils.createImmutableSet(
StringValue.getDescriptor(),
Int32Value.getDescriptor(),
@@ -190,7 +188,7 @@ public class ProtoConversionUtil {
return (GenericRecord) convertObject(schema, message);
}
- Schema getSchema(Class c) {
+ HoodieSchema getSchema(Class c) {
return SCHEMA_CACHE.computeIfAbsent(new SchemaCacheKey(c,
wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords), key -> {
try {
Object descriptor = c.getMethod("getDescriptor").invoke(null);
@@ -213,15 +211,15 @@ public class ProtoConversionUtil {
* @return a HoodieSchema
*/
HoodieSchema getSchema(Descriptors.Descriptor descriptor) {
- return HoodieSchema.fromAvroSchema(getMessageSchema(descriptor, new
CopyOnWriteMap<>(), getNamespace(descriptor.getFullName())));
+ return getMessageSchema(descriptor, new CopyOnWriteMap<>(),
getNamespace(descriptor.getFullName()));
}
- private Schema getEnumSchema(Descriptors.EnumDescriptor enumDescriptor) {
+ private HoodieSchema getEnumSchema(Descriptors.EnumDescriptor
enumDescriptor) {
List<String> symbols = new
ArrayList<>(enumDescriptor.getValues().size());
for (Descriptors.EnumValueDescriptor valueDescriptor :
enumDescriptor.getValues()) {
symbols.add(valueDescriptor.getName());
}
- return Schema.createEnum(enumDescriptor.getName(), null,
getNamespace(enumDescriptor.getFullName()), symbols);
+ return HoodieSchema.createEnum(enumDescriptor.getName(),
getNamespace(enumDescriptor.getFullName()), null, symbols);
}
/**
@@ -233,99 +231,93 @@ public class ProtoConversionUtil {
* This value is used for a namespace when creating Avro
records to avoid an error when reusing the same class name when unraveling a
recursive schema.
* @return an avro schema
*/
- private Schema getMessageSchema(Descriptors.Descriptor descriptor,
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, String path) {
+ private HoodieSchema getMessageSchema(Descriptors.Descriptor descriptor,
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, String path) {
// Parquet does not handle recursive schemas so we "unravel" the proto N
levels
Integer currentRecursionCount = recursionDepths.getOrDefault(descriptor,
0);
if (currentRecursionCount >= maxRecursionDepth) {
return RECURSION_OVERFLOW_SCHEMA;
}
// The current path is used as a namespace to avoid record name
collisions within recursive schemas
- Schema result = Schema.createRecord(descriptor.getName(), null, path,
false);
recursionDepths.put(descriptor, ++currentRecursionCount);
- List<Schema.Field> fields = new
ArrayList<>(descriptor.getFields().size());
+ List<HoodieSchemaField> fields = new
ArrayList<>(descriptor.getFields().size());
for (Descriptors.FieldDescriptor fieldDescriptor :
descriptor.getFields()) {
// each branch of the schema traversal requires its own recursion
depth tracking so copy the recursionDepths map
- Schema fieldSchema = getFieldSchema(fieldDescriptor, new
CopyOnWriteMap<>(recursionDepths), path);
- fields.add(new Schema.Field(fieldDescriptor.getName(), fieldSchema,
null, getDefault(fieldSchema, fieldDescriptor)));
+ HoodieSchema fieldSchema = getFieldSchema(fieldDescriptor, new
CopyOnWriteMap<>(recursionDepths), path);
+ fields.add(HoodieSchemaField.of(fieldDescriptor.getName(),
fieldSchema, null, getDefault(fieldSchema, fieldDescriptor)));
}
- result.setFields(fields);
- return result;
+ return HoodieSchema.createRecord(descriptor.getName(), path, null,
fields);
}
- private Schema getFieldSchema(Descriptors.FieldDescriptor fieldDescriptor,
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, String path) {
+ private HoodieSchema getFieldSchema(Descriptors.FieldDescriptor
fieldDescriptor, CopyOnWriteMap<Descriptors.Descriptor, Integer>
recursionDepths, String path) {
switch (fieldDescriptor.getType()) {
case BOOL:
- return finalizeSchema(Schema.create(Schema.Type.BOOLEAN),
fieldDescriptor);
+ return finalizeSchema(HoodieSchema.create(HoodieSchemaType.BOOLEAN),
fieldDescriptor);
case FLOAT:
- return finalizeSchema(Schema.create(Schema.Type.FLOAT),
fieldDescriptor);
+ return finalizeSchema(HoodieSchema.create(HoodieSchemaType.FLOAT),
fieldDescriptor);
case DOUBLE:
- return finalizeSchema(Schema.create(Schema.Type.DOUBLE),
fieldDescriptor);
+ return finalizeSchema(HoodieSchema.create(HoodieSchemaType.DOUBLE),
fieldDescriptor);
case ENUM:
return finalizeSchema(getEnumSchema(fieldDescriptor.getEnumType()),
fieldDescriptor);
case STRING:
- Schema stringSchema = Schema.create(Schema.Type.STRING);
- GenericData.setStringType(stringSchema,
GenericData.StringType.String);
+ HoodieSchema stringSchema =
HoodieSchema.create(HoodieSchemaType.STRING);
+ GenericData.setStringType(stringSchema.toAvroSchema(),
GenericData.StringType.String);
return finalizeSchema(stringSchema, fieldDescriptor);
case BYTES:
- return finalizeSchema(Schema.create(Schema.Type.BYTES),
fieldDescriptor);
+ return finalizeSchema(HoodieSchema.create(HoodieSchemaType.BYTES),
fieldDescriptor);
case INT32:
case SINT32:
case FIXED32:
case SFIXED32:
- return finalizeSchema(Schema.create(Schema.Type.INT),
fieldDescriptor);
+ return finalizeSchema(HoodieSchema.create(HoodieSchemaType.INT),
fieldDescriptor);
case UINT32:
case INT64:
case SINT64:
case FIXED64:
case SFIXED64:
- return finalizeSchema(Schema.create(Schema.Type.LONG),
fieldDescriptor);
+ return finalizeSchema(HoodieSchema.create(HoodieSchemaType.LONG),
fieldDescriptor);
case UINT64:
return finalizeSchema(UNSIGNED_LONG_SCHEMA, fieldDescriptor);
case MESSAGE:
String updatedPath = appendFieldNameToPath(path,
fieldDescriptor.getName());
if (!wrappedPrimitivesAsRecords &&
WRAPPER_DESCRIPTORS_TO_TYPE.contains(fieldDescriptor.getMessageType())) {
// all wrapper types have a single field, so we can get the first
field in the message's schema
- Schema nestedFieldSchema =
getFieldSchema(fieldDescriptor.getMessageType().getFields().get(0),
recursionDepths, updatedPath);
- return finalizeSchema(makeSchemaNullable(nestedFieldSchema),
fieldDescriptor);
+ HoodieSchema nestedFieldSchema =
getFieldSchema(fieldDescriptor.getMessageType().getFields().get(0),
recursionDepths, updatedPath);
+ return
finalizeSchema(HoodieSchema.createNullable(nestedFieldSchema), fieldDescriptor);
}
if (!timestampsAsRecords &&
Timestamp.getDescriptor().equals(fieldDescriptor.getMessageType())) {
// Handle timestamps as long with logical type
- Schema timestampSchema =
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
- return finalizeSchema(makeSchemaNullable(timestampSchema),
fieldDescriptor);
+ HoodieSchema timestampSchema =
HoodieSchema.createTimestampMicros();
+ return
finalizeSchema(HoodieSchema.createNullable(timestampSchema), fieldDescriptor);
}
// if message field is repeated (like a list), elements are non-null
if (fieldDescriptor.isRepeated()) {
- Schema elementSchema =
getMessageSchema(fieldDescriptor.getMessageType(), recursionDepths,
updatedPath);
+ HoodieSchema elementSchema =
getMessageSchema(fieldDescriptor.getMessageType(), recursionDepths,
updatedPath);
return finalizeSchema(elementSchema, fieldDescriptor);
}
// otherwise we create a nullable field schema
- Schema fieldSchema =
getMessageSchema(fieldDescriptor.getMessageType(), recursionDepths,
updatedPath);
- return finalizeSchema(makeSchemaNullable(fieldSchema),
fieldDescriptor);
+ HoodieSchema fieldSchema =
getMessageSchema(fieldDescriptor.getMessageType(), recursionDepths,
updatedPath);
+ return finalizeSchema(HoodieSchema.createNullable(fieldSchema),
fieldDescriptor);
case GROUP: // groups are deprecated
default:
throw new RuntimeException("Unexpected type: " +
fieldDescriptor.getType());
}
}
- private static Schema finalizeSchema(Schema schema,
Descriptors.FieldDescriptor fieldDescriptor) {
- Schema updatedSchema = schema;
+ private static HoodieSchema finalizeSchema(HoodieSchema schema,
Descriptors.FieldDescriptor fieldDescriptor) {
+ HoodieSchema updatedSchema = schema;
if (fieldDescriptor.isRepeated()) {
- updatedSchema = Schema.createArray(updatedSchema);
+ updatedSchema = HoodieSchema.createArray(updatedSchema);
}
// all fields in the oneof will be treated as nullable
- if (fieldDescriptor.getContainingOneof() != null && !(schema.getType()
== Schema.Type.UNION && schema.getTypes().get(0).getType() ==
Schema.Type.NULL)) {
- updatedSchema = makeSchemaNullable(updatedSchema);
+ if (fieldDescriptor.getContainingOneof() != null &&
!schema.isNullable()) {
+ updatedSchema = HoodieSchema.createNullable(schema);
}
return updatedSchema;
}
- private static Schema makeSchemaNullable(Schema schema) {
- return Schema.createUnion(Arrays.asList(NULL_SCHEMA, schema));
- }
-
- private Object getDefault(Schema fieldSchema, Descriptors.FieldDescriptor
fieldDescriptor) {
+ private Object getDefault(HoodieSchema fieldSchema,
Descriptors.FieldDescriptor fieldDescriptor) {
if (fieldDescriptor.isRepeated()) { // empty array as repeated fields'
default value
return Collections.emptyList();
}
@@ -352,7 +344,7 @@ public class ProtoConversionUtil {
case SFIXED64:
return 0;
case UINT64:
- return DECIMAL_CONVERSION.toFixed(new BigDecimal(BigInteger.ZERO),
fieldSchema, fieldSchema.getLogicalType()).bytes();
+ return DECIMAL_CONVERSION.toFixed(new BigDecimal(BigInteger.ZERO),
fieldSchema.toAvroSchema(),
fieldSchema.toAvroSchema().getLogicalType()).bytes();
case STRING:
case BYTES:
return "";
@@ -366,11 +358,11 @@ public class ProtoConversionUtil {
}
}
- private static Descriptors.FieldDescriptor[] getOrderedFields(Schema
schema, Message message) {
+ private static Descriptors.FieldDescriptor[] getOrderedFields(HoodieSchema
schema, Message message) {
Descriptors.Descriptor descriptor = message.getDescriptorForType();
return FIELD_CACHE.computeIfAbsent(Pair.of(schema, descriptor), key -> {
Descriptors.FieldDescriptor[] fields = new
Descriptors.FieldDescriptor[key.getLeft().getFields().size()];
- for (Schema.Field f : key.getLeft().getFields()) {
+ for (HoodieSchemaField f : key.getLeft().getFields()) {
fields[f.pos()] = key.getRight().findFieldByName(f.name());
}
return fields;
@@ -474,7 +466,7 @@ public class ProtoConversionUtil {
Map<Object, Object> mapValue = (Map) value;
Map<Object, Object> mapCopy = new HashMap<>(mapValue.size());
for (Map.Entry<Object, Object> entry : mapValue.entrySet()) {
-
mapCopy.put(convertObject(HoodieSchema.fromAvroSchema(STRING_SCHEMA),
entry.getKey()), convertObject(schema.getValueType(), entry.getValue()));
+ mapCopy.put(convertObject(STRING_SCHEMA, entry.getKey()),
convertObject(schema.getValueType(), entry.getValue()));
}
return mapCopy;
case NULL:
@@ -482,18 +474,18 @@ public class ProtoConversionUtil {
case RECORD:
GenericData.Record newRecord = new
GenericData.Record(schema.toAvroSchema());
Message messageValue = (Message) value;
- Descriptors.FieldDescriptor[] orderedFields =
getOrderedFields(schema.toAvroSchema(), messageValue);
- for (Schema.Field field : schema.toAvroSchema().getFields()) {
+ Descriptors.FieldDescriptor[] orderedFields =
getOrderedFields(schema, messageValue);
+ for (HoodieSchemaField field : schema.getFields()) {
int position = field.pos();
Descriptors.FieldDescriptor fieldDescriptor =
orderedFields[position];
Object convertedValue;
- Schema fieldSchema = field.schema();
+ HoodieSchema fieldSchema = field.schema();
// if incoming message does not contain the field, fieldDescriptor
will be null
// if the field schema is a union, it is nullable
- if (fieldSchema.getType() == Schema.Type.UNION && (fieldDescriptor
== null || (!fieldDescriptor.isRepeated() &&
!messageValue.hasField(fieldDescriptor)))) {
+ if (fieldSchema.isNullable() && (fieldDescriptor == null ||
(!fieldDescriptor.isRepeated() && !messageValue.hasField(fieldDescriptor)))) {
convertedValue = null;
} else {
- convertedValue =
convertObject(HoodieSchema.fromAvroSchema(fieldSchema), fieldDescriptor == null
? field.defaultVal() : messageValue.getField(fieldDescriptor));
+ convertedValue = convertObject(fieldSchema, fieldDescriptor ==
null ? field.defaultVal().orElse(null) :
messageValue.getField(fieldDescriptor));
}
newRecord.put(position, convertedValue);
}