yihua commented on code in PR #13882:
URL: https://github.com/apache/hudi/pull/13882#discussion_r2342302277


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java:
##########
@@ -54,24 +55,26 @@ public class HoodieInternalRowFileWriterFactory {
   public static HoodieInternalRowFileWriter 
getInternalRowFileWriter(StoragePath path,
                                                                      
HoodieTable hoodieTable,
                                                                      
HoodieWriteConfig writeConfig,
-                                                                     
StructType schema)
+                                                                     
StructType schema,
+                                                                     Schema 
avroSchema)

Review Comment:
   Is it possible to only pass in `Schema avroSchema` and derive `StructType 
schema` here internally, or that will incur another large refactoring?  
Conceptually `StructType schema` is derived somewhere else from the Avro write 
or table schema.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -109,11 +108,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> 
performClustering(final Hood
         Math.min(clusteringPlan.getInputGroups().size(), 
writeConfig.getClusteringMaxParallelism()),
         new CustomizedThreadFactory("clustering-job-group", true));
     try {
-      boolean canUseRowWriter = 
getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 true)
-          && HoodieDataTypeUtils.canUseRowWriter(schema, 
engineContext.hadoopConfiguration());
-      if (canUseRowWriter) {
-        
HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty(writeConfig.getProps(),
 schema);
-      }

Review Comment:
   So are all such tweaks of parquet writer behavior through write configs 
controlled in a central place in the write support class 
(`HoodieRowParquetWriteSupport`) so it's easier to maintain?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -18,39 +18,90 @@
 
 package org.apache.hudi.io.storage.row;
 
+import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.avro.AvroWriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.execution.datasources.DataSourceUtils;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.internal.LegacyBehaviorPolicy;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.MapType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampNTZType;
 import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.util.VersionUtils;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.IntStream;
 
+import scala.Enumeration;
+import scala.Function1;
+
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_FIELD_ID_WRITE_ENABLED;
 
 /**
  * Hoodie Write Support for directly writing Row to Parquet.
  */
-public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
+public class HoodieRowParquetWriteSupport extends WriteSupport<InternalRow> {
 
+  private static final Schema MAP_KEY_SCHEMA = 
Schema.create(Schema.Type.STRING);
+  private static final String MAP_REPEATED_NAME = "key_value";
+  private static final String MAP_KEY_NAME = "key";
+  private static final String MAP_VALUE_NAME = "value";
   private final Configuration hadoopConf;
   private final Option<HoodieBloomFilterWriteSupport<UTF8String>> 
bloomFilterWriteSupportOpt;
+  private final byte[] decimalBuffer = new 
byte[Decimal.minBytesForPrecision()[DecimalType.MAX_PRECISION()]];
+  private final Enumeration.Value datetimeRebaseMode = 
LegacyBehaviorPolicy.withName(SQLConf.get().getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE()));
+  private final Function1<Object, Object> dateRebaseFunction = 
DataSourceUtils.createDateRebaseFuncInWrite(datetimeRebaseMode, "Parquet");
+  private final Function1<Object, Object> timestampRebaseFunction = 
DataSourceUtils.createTimestampRebaseFuncInWrite(datetimeRebaseMode, "Parquet");
+  private RecordConsumer recordConsumer;
+  private final boolean writeLegacyListFormat;
+  private final ValueWriter[] rootFieldWriters;
+  private final Schema avroSchema;
 
-  public HoodieRowParquetWriteSupport(Configuration conf, StructType 
structType, Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {
+  public HoodieRowParquetWriteSupport(Configuration conf, StructType schema, 
Schema avroSchema, Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {

Review Comment:
   Could the `Schema avroSchema` be derived within the constructor give the 
write config `config` instance is available (like 
`BulkInsertDataInternalWriterHelper`) so there is no need to pass in the 
`avroSchema` from different place and the logic is consistent?
   ```
   this.schema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(writeConfig.getWriteSchema()), 
writeConfig.allowOperationMetadataField());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to