yihua commented on code in PR #7395:
URL: https://github.com/apache/hudi/pull/7395#discussion_r1042806294
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala:
##########
@@ -242,11 +287,12 @@ class ExpressionPayload(record: GenericRecord,
*
* @return
*/
- private def joinRecord(sourceRecord: IndexedRecord, targetRecord:
IndexedRecord): IndexedRecord = {
+ private def joinRecord(sourceRecord: IndexedRecord, targetRecord:
IndexedRecord): GenericRecord = {
val leftSchema = sourceRecord.getSchema
val joinSchema = getMergedSchema(leftSchema, targetRecord.getSchema)
- val values = new ArrayBuffer[AnyRef]()
+ // TODO rebase onto JoinRecord
Review Comment:
still needed?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala:
##########
@@ -289,65 +328,90 @@ object ExpressionPayload {
val PAYLOAD_RECORD_AVRO_SCHEMA = "hoodie.payload.record.schema"
/**
- * A cache for the serializedConditionAssignments to the compiled class
after CodeGen.
- * The Map[IExpressionEvaluator, IExpressionEvaluator] is the map of the
condition expression
- * to the assignments expression.
+ * To avoid compiling projections for Merge Into expressions for every
record these
+ * are cached under a key of
+ * <ol>
+ * <li>Expression's (textual) representation</li>
+ * <li>Expected input-schema</li>
+ * </ol>
+ *
+ * NOTE: Schema is required b/c these cache is static and might be shared by
multiple
+ * executed statements w/in a single Spark session
*/
- private val cache = Caffeine.newBuilder()
+ private val projectionsCache = Caffeine.newBuilder()
.maximumSize(1024)
- .build[String, Map[IExpressionEvaluator, IExpressionEvaluator]]()
+ .build[(String, Schema), Map[Projection, Projection]]()
private val schemaCache = Caffeine.newBuilder()
.maximumSize(16).build[String, Schema]()
+ private val mergedSchemaCache = Caffeine.newBuilder()
+ .maximumSize(16).build[(Schema, Schema), Schema]()
+
+ private val avroDeserializerCache = Caffeine.newBuilder()
+ .maximumSize(16).build[Schema, HoodieAvroDeserializer]()
+
+ private val avroSerializerCache = Caffeine.newBuilder()
+ .maximumSize(16).build[Schema, HoodieAvroSerializer]()
+
private def parseSchema(schemaStr: String): Schema = {
schemaCache.get(schemaStr,
new Function[String, Schema] {
override def apply(t: String): Schema = new Schema.Parser().parse(t)
})
}
+ private def getAvroDeserializerFor(schema: Schema) = {
+ avroDeserializerCache.get(schema, new Function[Schema,
HoodieAvroDeserializer] {
+ override def apply(t: Schema): HoodieAvroDeserializer =
+ sparkAdapter.createAvroDeserializer(schema,
convertAvroSchemaToStructType(schema))
+ })
+ }
+
+ private def getAvroSerializerFor(schema: Schema) = {
+ avroSerializerCache.get(schema, new Function[Schema, HoodieAvroSerializer]
{
+ override def apply(t: Schema): HoodieAvroSerializer =
+
sparkAdapter.createAvroSerializer(convertAvroSchemaToStructType(schema),
schema, isNullable(schema))
+ })
+ }
+
/**
- * Do the CodeGen for each condition and assignment expressions.We will
cache it to reduce
+ * Do the CodeGen for each condition and assignment expressions.We will
projectionsCache it to reduce
Review Comment:
nit: no need to fix this appearance of `cache`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]