yanghua commented on a change in pull request #2333:
URL: https://github.com/apache/hudi/pull/2333#discussion_r544382516



##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -179,7 +184,6 @@ public RawTripTestPayload 
generateRandomValueAsPerSchema(String schemaStr, Hoodi
     } else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) {
       return generatePayloadForShortTripSchema(key, commitTime);
     }
-

Review comment:
       Don't change the unnecessary line.

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -179,7 +184,6 @@ public RawTripTestPayload 
generateRandomValueAsPerSchema(String schemaStr, Hoodi
     } else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) {
       return generatePayloadForShortTripSchema(key, commitTime);
     }
-

Review comment:
       Don't change the unnecessary line.

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
##########
@@ -1400,7 +1403,9 @@ public void testReplaceFileIdIsExcludedInView() throws 
IOException {
     replacedFileIdsP2.add(fileId4);
     partitionToReplaceFileIds.put(partitionPath2, replacedFileIdsP2);
     HoodieCommitMetadata commitMetadata =
-        CommitUtils.buildMetadata(Collections.emptyList(), 
partitionToReplaceFileIds, Option.empty(), WriteOperationType.INSERT_OVERWRITE, 
"", HoodieTimeline.REPLACE_COMMIT_ACTION);
+        CommitUtils.buildMetadata(Collections.emptyList(), 
partitionToReplaceFileIds, Option.empty(),
+            WriteOperationType.INSERT_OVERWRITE, "", false,
+            HoodieTimeline.REPLACE_COMMIT_ACTION, metaClient);

Review comment:
       Can we merge this line with the previous?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PartialUpdatePayload extends OverwriteWithLatestAvroPayload {
+  public PartialUpdatePayload(GenericRecord record, Comparable orderingVal) {

Review comment:
       Add a new line to split with the class definition.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##########
@@ -191,7 +207,11 @@ protected long getAttemptId() {
   }
 
   protected HoodieFileWriter createNewFileWriter(String instantTime, Path 
path, HoodieTable<T, I, K, O> hoodieTable,
-      HoodieWriteConfig config, Schema schema, TaskContextSupplier 
taskContextSupplier) throws IOException {
-    return HoodieFileWriterFactory.getFileWriter(instantTime, path, 
hoodieTable, config, schema, taskContextSupplier);
+                                                 HoodieWriteConfig config, 
Schema schema, TaskContextSupplier taskContextSupplier) throws IOException {

Review comment:
       Keep the old code style please.

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java
##########
@@ -50,7 +50,8 @@ public void testCommitMetadataCreation() {
         Option.empty(),
         WriteOperationType.INSERT,
         TRIP_SCHEMA,
-        HoodieTimeline.DELTA_COMMIT_ACTION);
+        false,
+        HoodieTimeline.DELTA_COMMIT_ACTION, null);

Review comment:
       Please break the `null` into a new line.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##########
@@ -58,7 +60,9 @@ public static HoodieCommitMetadata 
buildMetadata(List<HoodieWriteStat> writeStat
                                                    Option<Map<String, String>> 
extraMetadata,
                                                    WriteOperationType 
operationType,
                                                    String 
schemaToStoreInCommit,
-                                                   String commitActionType) {
+                                                   Boolean updatePartialFields,

Review comment:
       Actually, IMO, it would be better to add the new parameter to the tail.

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
##########
@@ -1322,7 +1322,10 @@ public void testReplaceWithTimeTravel() throws 
IOException {
     replacedFileIds.add(fileId1);
     partitionToReplaceFileIds.put(partitionPath1, replacedFileIds);
     HoodieCommitMetadata commitMetadata =
-        CommitUtils.buildMetadata(Collections.emptyList(), 
partitionToReplaceFileIds, Option.empty(), WriteOperationType.INSERT_OVERWRITE, 
"", HoodieTimeline.REPLACE_COMMIT_ACTION);
+        CommitUtils.buildMetadata(Collections.emptyList(), 
partitionToReplaceFileIds, Option.empty(),
+            WriteOperationType.INSERT_OVERWRITE, "", false,
+            HoodieTimeline.REPLACE_COMMIT_ACTION, metaClient);

Review comment:
       Can we merge this line with the previous?
   
   

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java
##########
@@ -74,7 +75,8 @@ public void testReplaceMetadataCreation() {
         Option.empty(),
         WriteOperationType.INSERT,
         TRIP_SCHEMA,
-        HoodieTimeline.REPLACE_COMMIT_ACTION);
+        false,
+        HoodieTimeline.REPLACE_COMMIT_ACTION, null);

Review comment:
       ditto

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##########
@@ -67,6 +71,18 @@ public static HoodieCommitMetadata 
buildMetadata(List<HoodieWriteStat> writeStat
       extraMetadata.get().forEach(commitMetadata::addMetadata);
     }
     commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
schemaToStoreInCommit);
+
+    if (updatePartialFields) {
+      try {
+        TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
+        schemaToStoreInCommit = 
resolver.getTableAvroSchemaWithoutMetadataFields().toString();
+      } catch (Exception e) {
+        // ignore exception.
+      }
+    }
+    commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
schemaToStoreInCommit);
+
+

Review comment:
       Why two empty lines?

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodiePartialUpdate.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.PartialUpdatePayload;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_TRIP_SCHEMA;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_SCHEMA;
+import static 
org.apache.hudi.common.testutils.Transformations.recordsToHoodieKeys;
+import static org.apache.hudi.common.util.ParquetUtils.readAvroRecords;
+import static org.apache.hudi.common.util.ParquetUtils.readAvroSchema;
+import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodiePartialUpdate extends HoodieClientTestBase {
+
+  @Test
+  public void testCopyOnWritePartialUpdate() {
+    final String testPartitionPath = "2016/09/26";
+    SparkRDDWriteClient client = getHoodieWriteClient(getConfig(true, false));
+    dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+
+    String commitTime1 = "001";
+    client.startCommitWithTime(commitTime1);
+
+    List<HoodieRecord> inserts1 =
+            dataGen.generateInsertsStream(commitTime1, 100, false, 
TRIP_SCHEMA).collect(Collectors.toList()); // this writes ~500kb
+
+    List<HoodieKey> insertKeys = recordsToHoodieKeys(inserts1);
+    upsertAndCheck(client, insertKeys, commitTime1, false);
+
+    client = getHoodieWriteClient(getConfig(true, true));
+    String commitTime2 = "002";
+    client.startCommitWithTime(commitTime2);
+
+    WriteStatus writeStatus = upsertAndCheck(client, insertKeys, commitTime2, 
true);
+
+    Schema schema = readAvroSchema(hadoopConf, new Path(basePath, 
writeStatus.getStat().getPath()));
+    List<String> oldSchemaFieldNames = 
AVRO_TRIP_SCHEMA.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+    List<String> parquetFieldNames = 
schema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+
+    for (String name : oldSchemaFieldNames) {
+      assertTrue(parquetFieldNames.contains(name));
+    }
+
+    List<GenericRecord> records1 = readAvroRecords(hadoopConf, new 
Path(basePath, writeStatus.getStat().getPath()));
+    for (GenericRecord record : records1) {
+      assertEquals("rider-" + commitTime1, record.get("rider").toString());
+      assertEquals("driver-" + commitTime1, record.get("driver").toString());
+      assertEquals(String.valueOf(1L), record.get("timestamp").toString());
+    }
+  }
+
+  private WriteStatus upsertAndCheck(SparkRDDWriteClient client, 
List<HoodieKey> insertKeys, String commitTime, boolean partial) {
+    List<HoodieRecord> records = new ArrayList<>();
+    for (HoodieKey hoodieKey : insertKeys) {
+      PartialUpdatePayload payload;
+      if (partial) {
+        payload = 
dataGen.generatePartialUpdatePayloadForPartialTripSchema(hoodieKey, commitTime);
+      } else {
+        payload = dataGen.generatePartialUpdatePayloadForTripSchema(hoodieKey, 
commitTime);
+      }
+      HoodieRecord record = new HoodieRecord(hoodieKey, payload);
+      records.add(record);
+    }
+
+    JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(records, 1);
+    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, 
commitTime).collect();
+
+    assertNoWriteErrors(statuses);
+
+    assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
+    assertEquals(100,
+            readRowKeysFromParquet(hadoopConf, new Path(basePath, 
statuses.get(0).getStat().getPath()))
+                    .size(), "file should contain 100 records");

Review comment:
       These indents are not beautiful. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
##########
@@ -92,7 +92,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String 
instantTime, HoodieTa
    */
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
       String partitionPath, String fileId, Map<String, HoodieRecord<T>> 
recordMap,
-      TaskContextSupplier taskContextSupplier) {
+                            TaskContextSupplier taskContextSupplier) {

Review comment:
       Don't change the indent.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PartialUpdatePayload extends OverwriteWithLatestAvroPayload {
+  public PartialUpdatePayload(GenericRecord record, Comparable orderingVal) {
+    super(record, orderingVal);
+  }
+
+  public PartialUpdatePayload(Option<GenericRecord> record) {
+    this(record.get(), (record1) -> 0); // natural order
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema) throws IOException {
+    Option<IndexedRecord> recordOption = getInsertValue(schema);
+    if (recordOption.isPresent()) {
+      IndexedRecord record = recordOption.get();
+      GenericRecord current = (GenericRecord) record;

Review comment:
       Don't be mixed. Let's call the old one `currentValue` and the new one 
`insertValue`. OK?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to