alexeykudinkin commented on code in PR #6745:
URL: https://github.com/apache/hudi/pull/6745#discussion_r981583497


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetStreamWriter.java:
##########
@@ -40,6 +40,7 @@ public class HoodieSparkParquetStreamWriter implements 
HoodieSparkFileWriter, Au
   public HoodieSparkParquetStreamWriter(FSDataOutputStream outputStream,
       HoodieRowParquetConfig parquetConfig) throws IOException {
     this.writeSupport = parquetConfig.getWriteSupport();
+    this.writeSupport.enableLegacyFormat();

Review Comment:
   Sorry, not sure what `avro2parquetLog` is 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -867,18 +867,20 @@ object HoodieSparkSqlWriter {
           hoodieRecord
         }).toJavaRDD()
       case HoodieRecord.HoodieRecordType.SPARK =>
+        log.info("Use spark record")
         // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = 
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
+        val schemaWithMetaField = HoodieAvroUtils.addMetadataFields(schema, 
config.allowOperationMetadataField)
         val structType = HoodieInternalRowUtils.getCachedSchema(schema)
-        val structTypeBC = SparkContext.getOrCreate().broadcast(structType)
-        HoodieInternalRowUtils.addCompressedSchema(structType)
+        val structTypeWithMetaField = 
HoodieInternalRowUtils.getCachedSchema(schemaWithMetaField)
+        val structTypeBC = sparkContext.broadcast(structType)
+        HoodieInternalRowUtils.broadcastCompressedSchema(List(structType, 
structTypeWithMetaField), sparkContext)

Review Comment:
   We need a simpler solution here:
   
    - We should remove the schema from `HoodieSparkRecord`s. At this point we 
ultimately are familiar with all the limitations of the approach which Spark 
uses to reduce overhead of serializing the schema along w/ every record -- it 
requires us to provide all the schemas upfront to be distributed to all 
Executors (via `SparkConf` which is read-only after `SparkContext` is created).
   
    - We will have to pass record's schema externally: we can approach it the 
same way it's currently done for `ExpressionPayload` in [this 
PR](https://github.com/apache/hudi/pull/6358#pullrequestreview-1122556175)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -47,42 +50,49 @@
 import java.util.Properties;
 
 import static 
org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static 
org.apache.hudi.util.HoodieSparkRecordUtils.getNullableValAsString;
+import static org.apache.hudi.util.HoodieSparkRecordUtils.getValue;
 import static org.apache.spark.sql.types.DataTypes.BooleanType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
 /**
  * Spark Engine-specific Implementations of `HoodieRecord`.
  */
+@DefaultSerializer(HoodieSparkRecordSerializer.class)
 public class HoodieSparkRecord extends HoodieRecord<InternalRow> {

Review Comment:
   @wzx140 this has to be an invariant -- `HoodieSparkRecord` have to only hold 
`UnsafeRow` (as i called out before for the same reasons HoodieRecordPayload 
holds bytes and not objects). 
   
   This is necessary to make sure
   
    - We don't hold in memory a lot of small objects (for every cell in every 
row)
    - De/Serialization procedure is simple (we write bytes in, we read bytes 
out, no object deserialization is performed)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -867,18 +867,20 @@ object HoodieSparkSqlWriter {
           hoodieRecord
         }).toJavaRDD()
       case HoodieRecord.HoodieRecordType.SPARK =>
+        log.info("Use spark record")
         // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = 
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
+        val schemaWithMetaField = HoodieAvroUtils.addMetadataFields(schema, 
config.allowOperationMetadataField)
         val structType = HoodieInternalRowUtils.getCachedSchema(schema)
-        val structTypeBC = SparkContext.getOrCreate().broadcast(structType)
-        HoodieInternalRowUtils.addCompressedSchema(structType)
+        val structTypeWithMetaField = 
HoodieInternalRowUtils.getCachedSchema(schemaWithMetaField)
+        val structTypeBC = sparkContext.broadcast(structType)
+        HoodieInternalRowUtils.broadcastCompressedSchema(List(structType, 
structTypeWithMetaField), sparkContext)

Review Comment:
   Sorry, i misread your original intent with this change. Unfortunately, this 
is not going to work: for Broadcast to work exactly the same object has to be 
passed in (w/in captured Closure) from Driver to Executor (Broadcasts are 
identified by unique `bid`). In the current setup you're just overwriting 
static variables w/in `HoodieInternalRowUtils` creating new Broadcasts which 
aren't propagated to Executors (the reason it works in tests is b/c we run 
phony Spark cluster where both Driver and Executor share memory, ie they access 
the same loaded `HoodieInternalRowUtils` and this problem is not apparent) 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -867,18 +867,20 @@ object HoodieSparkSqlWriter {
           hoodieRecord
         }).toJavaRDD()
       case HoodieRecord.HoodieRecordType.SPARK =>
+        log.info("Use spark record")
         // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = 
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
+        val schemaWithMetaField = HoodieAvroUtils.addMetadataFields(schema, 
config.allowOperationMetadataField)
         val structType = HoodieInternalRowUtils.getCachedSchema(schema)
-        val structTypeBC = SparkContext.getOrCreate().broadcast(structType)
-        HoodieInternalRowUtils.addCompressedSchema(structType)
+        val structTypeWithMetaField = 
HoodieInternalRowUtils.getCachedSchema(schemaWithMetaField)
+        val structTypeBC = sparkContext.broadcast(structType)

Review Comment:
   This will work the same way as the broadcast (each executor will deserialize 
the callback once)



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordSerializer.scala:
##########
@@ -38,25 +40,20 @@ import scala.collection.mutable
  * schema, as to reduce network IO.
  * Actions like parsing or compressing schemas are computationally expensive 
so the serializer
  * caches all previously seen values as to reduce the amount of work needed to 
do.
- * @param schemas a map where the keys are unique IDs for spark schemas and 
the values are the
- *                string representation of the Avro schema, used to decrease 
the amount of data
- *                that needs to be serialized.
  */
-class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends 
KSerializer[HoodieSparkRecord] {
+class HoodieSparkRecordSerializer extends KSerializer[HoodieSparkRecord] {
   /** Used to reduce the amount of effort to compress the schema */
   private val compressCache = new mutable.HashMap[StructType, Array[Byte]]()
   private val decompressCache = new mutable.HashMap[ByteBuffer, StructType]()
 
-  /** Fingerprinting is very expensive so this alleviates most of the work */
-  private val fingerprintCache = new mutable.HashMap[StructType, Long]()
-  private val schemaCache = new mutable.HashMap[Long, StructType]()
-
   // GenericAvroSerializer can't take a SparkConf in the constructor b/c then 
it would become
   // a member of KryoSerializer, which would make KryoSerializer not 
Serializable.  We make
   // the codec lazy here just b/c in some unit tests, we use a KryoSerializer 
w/out having
   // the SparkEnv set (note those tests would fail if they tried to serialize 
avro data).
   private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
 
+  private var objSerializerMap = new ConcurrentHashMap[Kryo, 
FieldSerializer[HoodieSparkRecord]]

Review Comment:
   Please check my previous comment about serializing `HoodieSparkRecord`: 
   
   - We should NOT be using either Java or Kryo to serialize 
`HoodieSparkRecord` itself, instead
   - We should just write out bytes that are being held by `UnsafeRow` w/in 
`HoodieSparkRecord`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to