yihua commented on code in PR #13654:
URL: https://github.com/apache/hudi/pull/13654#discussion_r2268143390
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java:
##########
@@ -19,68 +19,307 @@
package org.apache.hudi.hadoop.utils;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieAvroSchemaException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
-import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.function.UnaryOperator;
+import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
+import static org.apache.hudi.avro.HoodieAvroUtils.createFullName;
+import static org.apache.hudi.avro.HoodieAvroUtils.createNamePrefix;
+import static org.apache.hudi.avro.HoodieAvroUtils.getOldFieldNameWithRenaming;
+import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDate;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+
public class HoodieArrayWritableAvroUtils {
- private static final Cache<Pair<Schema, Schema>, int[]>
- PROJECTION_CACHE = Caffeine.newBuilder().maximumSize(1000).build();
+ public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) {
+ return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema,
newSchema, renameCols, new LinkedList<>());
+ }
- public static int[] getProjection(Schema from, Schema to) {
- return PROJECTION_CACHE.get(Pair.of(from, to), schemas -> {
- List<Schema.Field> toFields = to.getFields();
- int[] newProjection = new int[toFields.size()];
- for (int i = 0; i < newProjection.length; i++) {
- newProjection[i] = from.getField(toFields.get(i).name()).pos();
- }
- return newProjection;
- });
+ private static Writable rewriteRecordWithNewSchema(Writable writable, Schema
oldAvroSchema, Schema newAvroSchema, Map<String, String> renameCols,
Deque<String> fieldNames) {
+ if (writable == null) {
+ return null;
+ }
+ Schema oldSchema = AvroSchemaUtils.resolveNullableSchema(oldAvroSchema);
+ Schema newSchema = AvroSchemaUtils.resolveNullableSchema(newAvroSchema);
+ if (oldSchema.equals(newSchema)) {
Review Comment:
Should this use the same "equivalent" equality check?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##########
@@ -121,6 +128,12 @@ public ClosableIterator<ArrayWritable>
getFileRecordIterator(
private ClosableIterator<ArrayWritable> getFileRecordIterator(StoragePath
filePath, String[] hosts, long start, long length, Schema dataSchema,
Schema
requiredSchema, HoodieStorage storage) throws IOException {
+ // mdt file schema irregular and does not work with this logic. Also, log
file evolution is handled inside the log block
+ boolean isParquetOrOrc =
filePath.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension())
+ ||
filePath.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension());
+ Schema avroFileSchema = isParquetOrOrc ?
HoodieIOFactory.getIOFactory(storage)
+ .getFileFormatUtils(filePath).readAvroSchema(storage, filePath) :
dataSchema;
Review Comment:
What's the difference between the schema read from file and the data schema
passed in? Is the data schema assigned from the table schema?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java:
##########
@@ -19,68 +19,307 @@
package org.apache.hudi.hadoop.utils;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieAvroSchemaException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
-import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.function.UnaryOperator;
+import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
+import static org.apache.hudi.avro.HoodieAvroUtils.createFullName;
+import static org.apache.hudi.avro.HoodieAvroUtils.createNamePrefix;
+import static org.apache.hudi.avro.HoodieAvroUtils.getOldFieldNameWithRenaming;
+import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDate;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+
public class HoodieArrayWritableAvroUtils {
- private static final Cache<Pair<Schema, Schema>, int[]>
- PROJECTION_CACHE = Caffeine.newBuilder().maximumSize(1000).build();
+ public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) {
+ return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema,
newSchema, renameCols, new LinkedList<>());
+ }
- public static int[] getProjection(Schema from, Schema to) {
- return PROJECTION_CACHE.get(Pair.of(from, to), schemas -> {
- List<Schema.Field> toFields = to.getFields();
- int[] newProjection = new int[toFields.size()];
- for (int i = 0; i < newProjection.length; i++) {
- newProjection[i] = from.getField(toFields.get(i).name()).pos();
- }
- return newProjection;
- });
+ private static Writable rewriteRecordWithNewSchema(Writable writable, Schema
oldAvroSchema, Schema newAvroSchema, Map<String, String> renameCols,
Deque<String> fieldNames) {
+ if (writable == null) {
+ return null;
+ }
+ Schema oldSchema = AvroSchemaUtils.resolveNullableSchema(oldAvroSchema);
+ Schema newSchema = AvroSchemaUtils.resolveNullableSchema(newAvroSchema);
Review Comment:
Does this incur overhead in per-record processing?
##########
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHiveHoodieReaderContext.java:
##########
@@ -151,13 +127,6 @@ void testConstructEngineRecordWithUpdates() {
assertTrue(((BooleanWritable) values[2]).get());
}
- private JobConf getJobConf() {
- JobConf jobConf = new
JobConf(storageConfiguration.unwrapAs(Configuration.class));
- jobConf.set("columns", "field_1,field_2,field_3,datestr");
- jobConf.set("columns.types",
"string,string,struct<nested_field:string>,string");
Review Comment:
Have you tried query a Hudi table with schema evolution using Hive engine
(not just the unit tests) to make sure everything still works, without
leveraging this conf provided by Hive (is this changed now)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]