nsivabalan commented on a change in pull request #2666:
URL: https://github.com/apache/hudi/pull/2666#discussion_r605601097



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -79,6 +80,7 @@
   public static final String TIMELINE_LAYOUT_VERSION = 
"hoodie.timeline.layout.version";
   public static final String BASE_PATH_PROP = "hoodie.base.path";
   public static final String AVRO_SCHEMA = "hoodie.avro.schema";
+  public static final String LAST_AVRO_SCHEMA = "hoodie.last.avro.schema";

Review comment:
       may be we can call this as "latest table schema". 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -106,7 +110,7 @@
   public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                            Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
                            TaskContextSupplier taskContextSupplier) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier);
+    super(config, instantTime, partitionPath, fileId, hoodieTable, 
getWriterSchemaIncludingAndExcludingMetadataPair(config, hoodieTable), 
taskContextSupplier);

Review comment:
       Can you help me understand something. Here we have HoodieWriteConfig at 
two places. As a separate entity (1st arg in constructor) and 
HoodieTable.getConfig(). I see within 
getWriterSchemaIncludingAndExcludingMetadataPair(...), we update lastSchema in 
hoodieWriteConfig, which will update the 1st arg. But table.getConfig() may not 
be updated right. 
   If above statement is right, how do we rely on 
table.getConfig.getLastSchema() in *MergeHelper classes. 
   May be I am missing something. can you throw some light. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##########
@@ -59,14 +61,24 @@ public static HoodieCommitMetadata 
buildMetadata(List<HoodieWriteStat> writeStat
                                                    Option<Map<String, String>> 
extraMetadata,
                                                    WriteOperationType 
operationType,
                                                    String 
schemaToStoreInCommit,
-                                                   String commitActionType) {
+                                                   String commitActionType,
+                                                   Boolean updatePartialFields,
+                                                   HoodieTableMetaClient 
metaClient) {
 
     HoodieCommitMetadata commitMetadata = buildMetadataFromStats(writeStats, 
partitionToReplaceFileIds, commitActionType, operationType);
 
     // add in extra metadata
     if (extraMetadata.isPresent()) {
       extraMetadata.get().forEach(commitMetadata::addMetadata);
     }
+    if (updatePartialFields) {
+      try {

Review comment:
       if updatePartialFields is set, can't we rely on config.getLastSchema() 
in all places where this method is called similar to how we pass in 
config.getSchema(). Is it not guaranteed that config.lastSchema() will be set 
by the time we reach here if updatePartialFields is set to true?
   Trying to see if we can avoid parsing the schema from table once again since 
we have already done it once. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PartialUpdatePayload extends OverwriteWithLatestAvroPayload {

Review comment:
       java docs w/ example would be great. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -123,6 +127,22 @@ public HoodieMergeHandle(HoodieWriteConfig config, String 
instantTime, HoodieTab
     init(fileId, this.partitionPath, dataFileToBeMerged);
   }
 
+  protected static Pair<Schema, Schema> 
getWriterSchemaIncludingAndExcludingMetadataPair(HoodieWriteConfig config, 
HoodieTable hoodieTable) {
+    Schema originalSchema = new Schema.Parser().parse(config.getSchema());
+    Schema hoodieSchema = HoodieAvroUtils.addMetadataFields(originalSchema);
+    boolean updatePartialFields = config.updatePartialFields();
+    if (updatePartialFields) {
+      try {
+        TableSchemaResolver resolver = new 
TableSchemaResolver(hoodieTable.getMetaClient());
+        Schema lastSchema = resolver.getTableAvroSchema();
+        config.setLastSchema(lastSchema.toString());

Review comment:
       infact, would prefer to throw an exception if table schema is not 
present and if someone is trying to use updatePartialFields. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PartialUpdatePayload extends OverwriteWithLatestAvroPayload {
+  public PartialUpdatePayload(GenericRecord record, Comparable orderingVal) {
+    super(record, orderingVal);
+  }
+
+  public PartialUpdatePayload(Option<GenericRecord> record) {
+    this(record.get(), (record1) -> 0); // natural order
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
lastValue, Schema schema) throws IOException {
+    Option<IndexedRecord> recordOption = getInsertValue(schema);
+    if (recordOption.isPresent()) {
+      IndexedRecord record = recordOption.get();
+      GenericRecord current = (GenericRecord) record;
+
+      List<Schema.Field> fieldList = schema.getFields();
+      GenericRecord last = (GenericRecord) lastValue;
+      for (Schema.Field field : fieldList) {
+        last.put(field.name(), current.get(field.name()));
+      }
+      return Option.ofNullable(last);
+    }
+    return recordOption;
+  }
+} 

Review comment:
       line break please.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PartialUpdatePayload extends OverwriteWithLatestAvroPayload {
+  public PartialUpdatePayload(GenericRecord record, Comparable orderingVal) {
+    super(record, orderingVal);
+  }
+
+  public PartialUpdatePayload(Option<GenericRecord> record) {
+    this(record.get(), (record1) -> 0); // natural order
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
lastValue, Schema schema) throws IOException {

Review comment:
       schema here refers to incoming partial schema right? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -123,6 +127,22 @@ public HoodieMergeHandle(HoodieWriteConfig config, String 
instantTime, HoodieTab
     init(fileId, this.partitionPath, dataFileToBeMerged);
   }
 
+  protected static Pair<Schema, Schema> 
getWriterSchemaIncludingAndExcludingMetadataPair(HoodieWriteConfig config, 
HoodieTable hoodieTable) {
+    Schema originalSchema = new Schema.Parser().parse(config.getSchema());
+    Schema hoodieSchema = HoodieAvroUtils.addMetadataFields(originalSchema);
+    boolean updatePartialFields = config.updatePartialFields();
+    if (updatePartialFields) {
+      try {
+        TableSchemaResolver resolver = new 
TableSchemaResolver(hoodieTable.getMetaClient());
+        Schema lastSchema = resolver.getTableAvroSchema();
+        config.setLastSchema(lastSchema.toString());

Review comment:
       lastSchema could be null if this is first commit or last commit is an 
operation which does not inject schema in commit metadata. Can we check if not 
null and then set the last schema. If not, .toString() could throw 
NullPointerException.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PartialUpdatePayload extends OverwriteWithLatestAvroPayload {
+  public PartialUpdatePayload(GenericRecord record, Comparable orderingVal) {
+    super(record, orderingVal);
+  }
+
+  public PartialUpdatePayload(Option<GenericRecord> record) {
+    this(record.get(), (record1) -> 0); // natural order
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
lastValue, Schema schema) throws IOException {

Review comment:
       nit: instead of "lastValue", may be "existingValue" or 
"existingStorageValue"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to