alexeykudinkin commented on code in PR #7395:
URL: https://github.com/apache/hudi/pull/7395#discussion_r1042848019


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala:
##########
@@ -289,65 +328,90 @@ object ExpressionPayload {
   val PAYLOAD_RECORD_AVRO_SCHEMA = "hoodie.payload.record.schema"
 
   /**
-   * A cache for the serializedConditionAssignments to the compiled class 
after CodeGen.
-   * The Map[IExpressionEvaluator, IExpressionEvaluator] is the map of the 
condition expression
-   * to the assignments expression.
+   * To avoid compiling projections for Merge Into expressions for every 
record these
+   * are cached under a key of
+   * <ol>
+   *    <li>Expression's (textual) representation</li>
+   *    <li>Expected input-schema</li>
+   * </ol>
+   *
+   * NOTE: Schema is required b/c these cache is static and might be shared by 
multiple
+   *       executed statements w/in a single Spark session
    */
-  private val cache = Caffeine.newBuilder()
+  private val projectionsCache = Caffeine.newBuilder()
     .maximumSize(1024)
-    .build[String, Map[IExpressionEvaluator, IExpressionEvaluator]]()
+    .build[(String, Schema), Map[Projection, Projection]]()
 
   private val schemaCache = Caffeine.newBuilder()
     .maximumSize(16).build[String, Schema]()
 
+  private val mergedSchemaCache = Caffeine.newBuilder()
+    .maximumSize(16).build[(Schema, Schema), Schema]()
+
+  private val avroDeserializerCache = Caffeine.newBuilder()
+    .maximumSize(16).build[Schema, HoodieAvroDeserializer]()
+
+  private val avroSerializerCache = Caffeine.newBuilder()
+    .maximumSize(16).build[Schema, HoodieAvroSerializer]()
+
   private def parseSchema(schemaStr: String): Schema = {
     schemaCache.get(schemaStr,
       new Function[String, Schema] {
         override def apply(t: String): Schema = new Schema.Parser().parse(t)
     })
   }
 
+  private def getAvroDeserializerFor(schema: Schema) = {
+    avroDeserializerCache.get(schema, new Function[Schema, 
HoodieAvroDeserializer] {
+      override def apply(t: Schema): HoodieAvroDeserializer =
+        sparkAdapter.createAvroDeserializer(schema, 
convertAvroSchemaToStructType(schema))
+    })
+  }
+
+  private def getAvroSerializerFor(schema: Schema) = {
+    avroSerializerCache.get(schema, new Function[Schema, HoodieAvroSerializer] 
{
+      override def apply(t: Schema): HoodieAvroSerializer =
+        
sparkAdapter.createAvroSerializer(convertAvroSchemaToStructType(schema), 
schema, isNullable(schema))
+    })
+  }
+
   /**
-   * Do the CodeGen for each condition and assignment expressions.We will 
cache it to reduce
+   * Do the CodeGen for each condition and assignment expressions.We will 
projectionsCache it to reduce

Review Comment:
   Will address in a follow-up (have another PR for Spark SQL)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to