alexeykudinkin commented on code in PR #6745:
URL: https://github.com/apache/hudi/pull/6745#discussion_r980535029


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -91,21 +59,14 @@ private static Option<String> 
getNullableValAsString(StructType structType, Inte
    * @param structType  {@link StructType} instance.
    * @return Column value if a single column, or concatenated String values by 
comma.
    */
-  public static Object getRecordColumnValues(InternalRow row,
+  public static List<Object> getRecordColumnValues(InternalRow row,
       String[] columns,
       StructType structType, boolean consistentLogicalTimestampEnabled) {
-    if (columns.length == 1) {
-      NestedFieldPath posList = 
HoodieInternalRowUtils.getCachedPosList(structType, columns[0]);
-      return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
-    } else {
-      // TODO this is inefficient, instead we can simply return array of 
Comparable
-      StringBuilder sb = new StringBuilder();
-      for (String col : columns) {
-        // TODO support consistentLogicalTimestampEnabled
-        NestedFieldPath posList = 
HoodieInternalRowUtils.getCachedPosList(structType, columns[0]);
-        return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
-      }
-      return sb.toString();
+    List<Object> result = new ArrayList<>();
+    for (String col : columns) {
+      NestedFieldPath posList = 
HoodieInternalRowUtils.getCachedPosList(structType, col);
+      
result.add(Option.ofNullable(HoodieUnsafeRowUtils.getNestedInternalRowValue(row,
 posList)).orElse("").toString());

Review Comment:
   @wzx140 We should not be converting values to strings



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -91,21 +59,14 @@ private static Option<String> 
getNullableValAsString(StructType structType, Inte
    * @param structType  {@link StructType} instance.
    * @return Column value if a single column, or concatenated String values by 
comma.
    */
-  public static Object getRecordColumnValues(InternalRow row,
+  public static List<Object> getRecordColumnValues(InternalRow row,
       String[] columns,
       StructType structType, boolean consistentLogicalTimestampEnabled) {
-    if (columns.length == 1) {
-      NestedFieldPath posList = 
HoodieInternalRowUtils.getCachedPosList(structType, columns[0]);
-      return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
-    } else {
-      // TODO this is inefficient, instead we can simply return array of 
Comparable
-      StringBuilder sb = new StringBuilder();
-      for (String col : columns) {
-        // TODO support consistentLogicalTimestampEnabled
-        NestedFieldPath posList = 
HoodieInternalRowUtils.getCachedPosList(structType, columns[0]);
-        return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
-      }
-      return sb.toString();
+    List<Object> result = new ArrayList<>();
+    for (String col : columns) {
+      NestedFieldPath posList = 
HoodieInternalRowUtils.getCachedPosList(structType, col);
+      
result.add(Option.ofNullable(HoodieUnsafeRowUtils.getNestedInternalRowValue(row,
 posList)).orElse("").toString());
     }
+    return FlatLists.of(result);

Review Comment:
   Let's return arrays instead (then we don't need to worry about them to be 
comparable, it becomes caller responsibility to ensure that)



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java:
##########
@@ -143,6 +143,7 @@ protected <T> ClosableIterator<HoodieRecord<T>> 
readRecordsFromBlockPayload(Hood
     //       is appropriately carried over
     Configuration inlineConf = new 
Configuration(blockContentLoc.getHadoopConf());
     inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", 
InLineFileSystem.class.getName());
+    inlineConf.set("spark.sql.parquet.writeLegacyFormat", "true");

Review Comment:
   @wzx140 we need to review all the changes holistically to make sure we're 
not landing any unilateral config changes like that along w/ RFC-46



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala:
##########
@@ -267,8 +277,23 @@ object HoodieInternalRowUtils {
     }
   }
 
+  def broadcastCompressedSchema(schemas: List[StructType], sc: SparkContext): 
Unit = {
+    schemas.foreach(addCompressedSchema)
+    fingerPrintSchemaMapBC = sc.broadcast(fingerPrintSchemaMap)
+    schemaFingerPrintMapBC = sc.broadcast(schemaFingerPrintMap)
+  }
+
   def containsCompressedSchema(schema: StructType): Boolean = {
-    fingerPrintSchemaMap.containsKey(schema)
+    fingerPrintSchemaMap.containsKey(schema) || (fingerPrintSchemaMapBC != null
+      && fingerPrintSchemaMapBC.value.containsKey(schema))
+  }
+
+  /**
+   * For UT.

Review Comment:
   There's common annotation used for this: `@VisibleForTesting`



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala:
##########
@@ -267,8 +277,23 @@ object HoodieInternalRowUtils {
     }
   }
 
+  def broadcastCompressedSchema(schemas: List[StructType], sc: SparkContext): 
Unit = {

Review Comment:
   Let's avoid over-complicating the implementation unnecessarily -- as we've 
discussed before we don't need to share the caches b/w 
`HoodieSparkRecordSerializer` and `HoodieInternalRowUtils` they are fine to 
have separate caches of their own.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetStreamWriter.java:
##########
@@ -40,6 +40,7 @@ public class HoodieSparkParquetStreamWriter implements 
HoodieSparkFileWriter, Au
   public HoodieSparkParquetStreamWriter(FSDataOutputStream outputStream,
       HoodieRowParquetConfig parquetConfig) throws IOException {
     this.writeSupport = parquetConfig.getWriteSupport();
+    this.writeSupport.enableLegacyFormat();

Review Comment:
   Why are we doing that?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -470,10 +481,8 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
       usesVirtualKeys = !tableConfig.populateMetaFields(),
       recordPayloadClassName = tableConfig.getPayloadClass,
       metadataConfig = fileIndex.metadataConfig,
-      mergerImpls = optParams.getOrElse(HoodieWriteConfig.MERGER_IMPLS.key(),
-        HoodieWriteConfig.MERGER_IMPLS.defaultValue()),
-      mergerStrategy = 
optParams.getOrElse(HoodieWriteConfig.MERGER_STRATEGY.key(),
-        metaClient.getTableConfig.getMergerStrategy)
+      mergerImpls = mergerImpls,

Review Comment:
   Leaving comments here for `generateRecordMerger` as well:
   
    - Let's do config parsing here, passing in `Seq[String]` instead of 
`String` 
    - Let's rename `generateRecordMerger` to `createRecordMerger`



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -867,18 +867,20 @@ object HoodieSparkSqlWriter {
           hoodieRecord
         }).toJavaRDD()
       case HoodieRecord.HoodieRecordType.SPARK =>
+        log.info("Use spark record")

Review Comment:
   Let's generalize this to log whatever record-type is used. Let's also 
downgrade to debug



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -152,8 +156,7 @@ protected <T> void processNextRecord(HoodieRecord<T> 
hoodieRecord) throws IOExce
       T combinedValue = ((HoodieRecord<T>) recordMerger.merge(oldRecord, 
hoodieRecord, readerSchema, 
this.hoodieTableMetaClient.getTableConfig().getProps()).get()).getData();
       // If combinedValue is oldValue, no need rePut oldRecord
       if (combinedValue != oldValue) {
-        hoodieRecord.setData(combinedValue);
-        records.put(key, hoodieRecord);
+        records.put(key, hoodieRecord.newInstance(combinedValue));

Review Comment:
   But that doesn't make sense, right? Previously `preCombine` would return 
`HoodieRecordPayload` we're now returning `HoodieRecord` then call `getData` 
and immediately write it back into `HoodieRecord`.
   
   Let's simplify it



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java:
##########
@@ -143,6 +143,7 @@ protected <T> ClosableIterator<HoodieRecord<T>> 
readRecordsFromBlockPayload(Hood
     //       is appropriately carried over
     Configuration inlineConf = new 
Configuration(blockContentLoc.getHadoopConf());
     inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", 
InLineFileSystem.class.getName());
+    inlineConf.set("spark.sql.parquet.writeLegacyFormat", "true");

Review Comment:
   Please remove this one as well



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -867,18 +867,20 @@ object HoodieSparkSqlWriter {
           hoodieRecord
         }).toJavaRDD()
       case HoodieRecord.HoodieRecordType.SPARK =>
+        log.info("Use spark record")
         // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = 
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
+        val schemaWithMetaField = HoodieAvroUtils.addMetadataFields(schema, 
config.allowOperationMetadataField)
         val structType = HoodieInternalRowUtils.getCachedSchema(schema)
-        val structTypeBC = SparkContext.getOrCreate().broadcast(structType)
-        HoodieInternalRowUtils.addCompressedSchema(structType)
+        val structTypeWithMetaField = 
HoodieInternalRowUtils.getCachedSchema(schemaWithMetaField)
+        val structTypeBC = sparkContext.broadcast(structType)
+        HoodieInternalRowUtils.broadcastCompressedSchema(List(structType, 
structTypeWithMetaField), sparkContext)

Review Comment:
   Why do we need this?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##########
@@ -454,9 +456,8 @@ private object HoodieMergeOnReadRDD {
   }
 
   private def registerStructTypeSerializerIfNeed(schemas: List[StructType]): 
Unit = {

Review Comment:
   Why do we need this method? 
   Schema registration should be adhoc -- whenever someone is requesting to 
serialize `HoodieSparkRecord` we'd cache the schema, replacing w/ the 
fingerprint



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -867,18 +867,20 @@ object HoodieSparkSqlWriter {
           hoodieRecord
         }).toJavaRDD()
       case HoodieRecord.HoodieRecordType.SPARK =>
+        log.info("Use spark record")
         // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = 
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
+        val schemaWithMetaField = HoodieAvroUtils.addMetadataFields(schema, 
config.allowOperationMetadataField)
         val structType = HoodieInternalRowUtils.getCachedSchema(schema)
-        val structTypeBC = SparkContext.getOrCreate().broadcast(structType)
-        HoodieInternalRowUtils.addCompressedSchema(structType)
+        val structTypeWithMetaField = 
HoodieInternalRowUtils.getCachedSchema(schemaWithMetaField)
+        val structTypeBC = sparkContext.broadcast(structType)

Review Comment:
   We don't need broadcast for that (StructType could be passed in as closure 
param)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##########
@@ -203,6 +203,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
         maxCompactionMemoryInBytes, config, internalSchema)
     }
 
+    log.info(s"Use ${logScanner.getRecordType}")

Review Comment:
   Please downgrade to debug



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -77,6 +77,10 @@ public WriteSupport.FinalizedWriteContext finalizeWrite() {
     return new WriteSupport.FinalizedWriteContext(extraMetaData);
   }
 
+  public void enableLegacyFormat() {
+    hadoopConf.set("spark.sql.parquet.writeLegacyFormat", "true");

Review Comment:
   We should not need that



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordSerializer.scala:
##########
@@ -38,25 +40,20 @@ import scala.collection.mutable
  * schema, as to reduce network IO.
  * Actions like parsing or compressing schemas are computationally expensive 
so the serializer
  * caches all previously seen values as to reduce the amount of work needed to 
do.
- * @param schemas a map where the keys are unique IDs for spark schemas and 
the values are the
- *                string representation of the Avro schema, used to decrease 
the amount of data
- *                that needs to be serialized.
  */
-class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends 
KSerializer[HoodieSparkRecord] {
+class HoodieSparkRecordSerializer extends KSerializer[HoodieSparkRecord] {
   /** Used to reduce the amount of effort to compress the schema */
   private val compressCache = new mutable.HashMap[StructType, Array[Byte]]()
   private val decompressCache = new mutable.HashMap[ByteBuffer, StructType]()
 
-  /** Fingerprinting is very expensive so this alleviates most of the work */
-  private val fingerprintCache = new mutable.HashMap[StructType, Long]()
-  private val schemaCache = new mutable.HashMap[Long, StructType]()
-
   // GenericAvroSerializer can't take a SparkConf in the constructor b/c then 
it would become
   // a member of KryoSerializer, which would make KryoSerializer not 
Serializable.  We make
   // the codec lazy here just b/c in some unit tests, we use a KryoSerializer 
w/out having
   // the SparkEnv set (note those tests would fail if they tried to serialize 
avro data).
   private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
 
+  private var objSerializerMap = new ConcurrentHashMap[Kryo, 
FieldSerializer[HoodieSparkRecord]]

Review Comment:
   Please check my other comment: let's keep previous implementation that was 
easier to comprehend and manage



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -47,42 +50,49 @@
 import java.util.Properties;
 
 import static 
org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static 
org.apache.hudi.util.HoodieSparkRecordUtils.getNullableValAsString;
+import static org.apache.hudi.util.HoodieSparkRecordUtils.getValue;
 import static org.apache.spark.sql.types.DataTypes.BooleanType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
 /**
  * Spark Engine-specific Implementations of `HoodieRecord`.
  */
+@DefaultSerializer(HoodieSparkRecordSerializer.class)
 public class HoodieSparkRecord extends HoodieRecord<InternalRow> {

Review Comment:
   That's the case for now. We need to make sure that it doesn't fail when 
someone start using it filling it w/ `GenericInternalRow`. 
   
   We need to add UnsafeProjection into ctor (which will be a no-op whenever 
we'll be getting UnsafeRow already)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to