yihua commented on code in PR #13882:
URL: https://github.com/apache/hudi/pull/13882#discussion_r2342327656


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -18,39 +18,90 @@
 
 package org.apache.hudi.io.storage.row;
 
+import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.avro.AvroWriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.execution.datasources.DataSourceUtils;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.internal.LegacyBehaviorPolicy;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.MapType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampNTZType;
 import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.util.VersionUtils;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.IntStream;
 
+import scala.Enumeration;
+import scala.Function1;
+
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_FIELD_ID_WRITE_ENABLED;
 
 /**
  * Hoodie Write Support for directly writing Row to Parquet.
  */
-public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
+public class HoodieRowParquetWriteSupport extends WriteSupport<InternalRow> {
 
+  private static final Schema MAP_KEY_SCHEMA = 
Schema.create(Schema.Type.STRING);
+  private static final String MAP_REPEATED_NAME = "key_value";
+  private static final String MAP_KEY_NAME = "key";
+  private static final String MAP_VALUE_NAME = "value";
   private final Configuration hadoopConf;
   private final Option<HoodieBloomFilterWriteSupport<UTF8String>> 
bloomFilterWriteSupportOpt;
+  private final byte[] decimalBuffer = new 
byte[Decimal.minBytesForPrecision()[DecimalType.MAX_PRECISION()]];
+  private final Enumeration.Value datetimeRebaseMode = 
LegacyBehaviorPolicy.withName(SQLConf.get().getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE()));
+  private final Function1<Object, Object> dateRebaseFunction = 
DataSourceUtils.createDateRebaseFuncInWrite(datetimeRebaseMode, "Parquet");
+  private final Function1<Object, Object> timestampRebaseFunction = 
DataSourceUtils.createTimestampRebaseFuncInWrite(datetimeRebaseMode, "Parquet");
+  private RecordConsumer recordConsumer;
+  private final boolean writeLegacyListFormat;
+  private final ValueWriter[] rootFieldWriters;
+  private final Schema avroSchema;
 
-  public HoodieRowParquetWriteSupport(Configuration conf, StructType 
structType, Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {
+  public HoodieRowParquetWriteSupport(Configuration conf, StructType schema, 
Schema avroSchema, Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {

Review Comment:
   In the future, when `HoodieSchema` abstraction is introduced, it's easier to 
swap the current schema representation with `HoodieSchema` in a limited number 
of places.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala:
##########
@@ -37,72 +29,4 @@ object HoodieDataTypeUtils {
    */
   def parseStructTypeFromJson(jsonSchema: String): StructType =
     StructType.fromString(jsonSchema)
-
-  def canUseRowWriter(schema: Schema, conf: Configuration): Boolean = {
-    if (conf.getBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, true)) {
-      // if we can write lists with the old list structure, we can use row 
writer regardless of decimal precision
-      true
-    } else if (!HoodieAvroUtils.hasSmallPrecisionDecimalField(schema)) {
-      true
-    } else {
-      // small precision decimals require the legacy write mode but lists and 
maps require the new write mode when
-      // WRITE_OLD_LIST_STRUCTURE is false so we can only use row writer if 
one is present and the other is not
-      if (HoodieAvroUtils.hasListOrMapField(schema)) {
-        log.warn("Cannot use row writer due to presence of list or map with a 
small precision decimal field")
-        false
-      } else {
-        true
-      }
-    }
-  }

Review Comment:
   To clarify, there is no longer cases that row writer should be not used and 
fallback should be incurred, correct? And the timestamp precision is also 
properly handled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to