alexeykudinkin commented on code in PR #6745:
URL: https://github.com/apache/hudi/pull/6745#discussion_r981583497
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetStreamWriter.java:
##########
@@ -40,6 +40,7 @@ public class HoodieSparkParquetStreamWriter implements
HoodieSparkFileWriter, Au
public HoodieSparkParquetStreamWriter(FSDataOutputStream outputStream,
HoodieRowParquetConfig parquetConfig) throws IOException {
this.writeSupport = parquetConfig.getWriteSupport();
+ this.writeSupport.enableLegacyFormat();
Review Comment:
Sorry, not sure what `avro2parquetLog` is
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -867,18 +867,20 @@ object HoodieSparkSqlWriter {
hoodieRecord
}).toJavaRDD()
case HoodieRecord.HoodieRecordType.SPARK =>
+ log.info("Use spark record")
// ut will use AvroKeyGenerator, so we need to cast it in spark record
val sparkKeyGenerator =
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
+ val schemaWithMetaField = HoodieAvroUtils.addMetadataFields(schema,
config.allowOperationMetadataField)
val structType = HoodieInternalRowUtils.getCachedSchema(schema)
- val structTypeBC = SparkContext.getOrCreate().broadcast(structType)
- HoodieInternalRowUtils.addCompressedSchema(structType)
+ val structTypeWithMetaField =
HoodieInternalRowUtils.getCachedSchema(schemaWithMetaField)
+ val structTypeBC = sparkContext.broadcast(structType)
+ HoodieInternalRowUtils.broadcastCompressedSchema(List(structType,
structTypeWithMetaField), sparkContext)
Review Comment:
We need a simpler solution here:
- We should remove the schema from `HoodieSparkRecord`s. At this point we
ultimately are familiar with all the limitations of the approach which Spark
uses to reduce overhead of serializing the schema along w/ every record -- it
requires us to provide all the schemas upfront to be distributed to all
Executors (via `SparkConf` which is read-only after `SparkContext` is created).
- We will have to pass record's schema externally: we can approach it the
same way it's currently done for `ExpressionPayload` in [this
PR](https://github.com/apache/hudi/pull/6358#pullrequestreview-1122556175)
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -47,42 +50,49 @@
import java.util.Properties;
import static
org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static
org.apache.hudi.util.HoodieSparkRecordUtils.getNullableValAsString;
+import static org.apache.hudi.util.HoodieSparkRecordUtils.getValue;
import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;
/**
* Spark Engine-specific Implementations of `HoodieRecord`.
*/
+@DefaultSerializer(HoodieSparkRecordSerializer.class)
public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
Review Comment:
@wzx140 this has to be an invariant -- `HoodieSparkRecord` have to only hold
`UnsafeRow` (as i called out before for the same reasons HoodieRecordPayload
holds bytes and not objects).
This is necessary to make sure
- We don't hold in memory a lot of small objects (for every cell in every
row)
- De/Serialization procedure is simple (we write bytes in, we read bytes
out, no object deserialization is performed)
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -867,18 +867,20 @@ object HoodieSparkSqlWriter {
hoodieRecord
}).toJavaRDD()
case HoodieRecord.HoodieRecordType.SPARK =>
+ log.info("Use spark record")
// ut will use AvroKeyGenerator, so we need to cast it in spark record
val sparkKeyGenerator =
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
+ val schemaWithMetaField = HoodieAvroUtils.addMetadataFields(schema,
config.allowOperationMetadataField)
val structType = HoodieInternalRowUtils.getCachedSchema(schema)
- val structTypeBC = SparkContext.getOrCreate().broadcast(structType)
- HoodieInternalRowUtils.addCompressedSchema(structType)
+ val structTypeWithMetaField =
HoodieInternalRowUtils.getCachedSchema(schemaWithMetaField)
+ val structTypeBC = sparkContext.broadcast(structType)
+ HoodieInternalRowUtils.broadcastCompressedSchema(List(structType,
structTypeWithMetaField), sparkContext)
Review Comment:
Sorry, i misread your original intent with this change. Unfortunately, this
is not going to work: for Broadcast to work exactly the same object has to be
passed in (w/in captured Closure) from Driver to Executor (Broadcasts are
identified by unique `bid`). In the current setup you're just overwriting
static variables w/in `HoodieInternalRowUtils` creating new Broadcasts which
aren't propagated to Executors (the reason it works in tests is b/c we run
phony Spark cluster where both Driver and Executor share memory, ie they access
the same loaded `HoodieInternalRowUtils` and this problem is not apparent)
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -867,18 +867,20 @@ object HoodieSparkSqlWriter {
hoodieRecord
}).toJavaRDD()
case HoodieRecord.HoodieRecordType.SPARK =>
+ log.info("Use spark record")
// ut will use AvroKeyGenerator, so we need to cast it in spark record
val sparkKeyGenerator =
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
+ val schemaWithMetaField = HoodieAvroUtils.addMetadataFields(schema,
config.allowOperationMetadataField)
val structType = HoodieInternalRowUtils.getCachedSchema(schema)
- val structTypeBC = SparkContext.getOrCreate().broadcast(structType)
- HoodieInternalRowUtils.addCompressedSchema(structType)
+ val structTypeWithMetaField =
HoodieInternalRowUtils.getCachedSchema(schemaWithMetaField)
+ val structTypeBC = sparkContext.broadcast(structType)
Review Comment:
This will work the same way as the broadcast (each executor will deserialize
the callback once)
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordSerializer.scala:
##########
@@ -38,25 +40,20 @@ import scala.collection.mutable
* schema, as to reduce network IO.
* Actions like parsing or compressing schemas are computationally expensive
so the serializer
* caches all previously seen values as to reduce the amount of work needed to
do.
- * @param schemas a map where the keys are unique IDs for spark schemas and
the values are the
- * string representation of the Avro schema, used to decrease
the amount of data
- * that needs to be serialized.
*/
-class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends
KSerializer[HoodieSparkRecord] {
+class HoodieSparkRecordSerializer extends KSerializer[HoodieSparkRecord] {
/** Used to reduce the amount of effort to compress the schema */
private val compressCache = new mutable.HashMap[StructType, Array[Byte]]()
private val decompressCache = new mutable.HashMap[ByteBuffer, StructType]()
- /** Fingerprinting is very expensive so this alleviates most of the work */
- private val fingerprintCache = new mutable.HashMap[StructType, Long]()
- private val schemaCache = new mutable.HashMap[Long, StructType]()
-
// GenericAvroSerializer can't take a SparkConf in the constructor b/c then
it would become
// a member of KryoSerializer, which would make KryoSerializer not
Serializable. We make
// the codec lazy here just b/c in some unit tests, we use a KryoSerializer
w/out having
// the SparkEnv set (note those tests would fail if they tried to serialize
avro data).
private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
+ private var objSerializerMap = new ConcurrentHashMap[Kryo,
FieldSerializer[HoodieSparkRecord]]
Review Comment:
Please check my previous comment about serializing `HoodieSparkRecord`:
- We should NOT be using either Java or Kryo to serialize
`HoodieSparkRecord` itself, instead
- We should just write out bytes that are being held by `UnsafeRow` w/in
`HoodieSparkRecord`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]