alexeykudinkin commented on code in PR #4676:
URL: https://github.com/apache/hudi/pull/4676#discussion_r971277519


##########
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * subclass of OverwriteNonDefaultsWithLatestAvroPayload used for Partial 
update Hudi Table.
+ *
+ * Simplified partial update Logic:
+ *  1 preCombine
+ *  For every record with duplicate record (same record key) in the same batch 
or in the delta logs that belongs to same File Group
+ *      Check if one record's ordering value is larger than the other record. 
If yes,overwrite the exists one for specified fields
+ *  that doesn't equal to null.
+ *
+ *  2 combineAndGetUpdateValue
+ *  For every incoming record with exists record in storage (same record key)
+ *      Check if incoming record's ordering value is larger than exists 
record. If yes,overwrite the exists one for specified fields
+ *  that doesn't equal to null.
+ *      else overwrite the incoming one with exists record for specified 
fields that doesn't equal to null
+ *  get a merged record, write to file.
+ *
+ *  Illustration with simple data.
+ *  let's say the order field is 'ts' and schema is :
+ *  {
+ *    [
+ *      {"name":"id","type":"string"},
+ *      {"name":"ts","type":"long"},
+ *      {"name":"name","type":"string"},
+ *      {"name":"price","type":"string"}
+ *    ]
+ *  }
+ *
+ *  case 1
+ *  Current data:
+ *      id      ts      name    price
+ *      1     , 1     , name_1, price_1

Review Comment:
   I think we can drop "," from the rows representation



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java:
##########
@@ -51,16 +55,20 @@ protected HoodieData<HoodieRecord<T>> 
tag(HoodieData<HoodieRecord<T>> dedupedRec
 
   @Override
   public HoodieData<HoodieRecord<T>> deduplicateRecords(
-      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int 
parallelism) {
+      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int 
parallelism, String jsonSchema) {
     boolean isIndexingGlobal = index.isGlobal();
+    final Schema[] schema = {null};
     return records.mapToPair(record -> {
       HoodieKey hoodieKey = record.getKey();
       // If index used is global, then records are expected to differ in their 
partitionPath
       Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
       return Pair.of(key, record);
     }).reduceByKey((rec1, rec2) -> {
+      if (schema[0] == null) {
+        schema[0] = new Schema.Parser().parse(jsonSchema);

Review Comment:
   Apologies, i missed that this is handling RDDs. 
   
   See, this is working around the problem in a very cryptic and obscure way 
(the way it works is basically spark dispatches empty array to every executor, 
and then every executor updates its internally parsing schema only once).
   
   Instead we can use much more straightforward solution:
   
   ```
   val broadcastedSchema = sc.broadcast(new SerializableSchema(avroSchema))
   
   // ...
      .reduceByKey((...) => {
        val schema = broadCastedSchema.value
      })
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to