yanghua commented on a change in pull request #3888:
URL: https://github.com/apache/hudi/pull/3888#discussion_r743495556



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -384,6 +385,9 @@
           + " but for the case the write schema is not equal to the specified 
table schema, we can"
           + " specify the write schema by this parameter. Used by 
MergeIntoHoodieTableCommand");
 
+  public static final String UPDATE_JOIN_FIELDS = "hoodie.update.join.fields";
+  public static final String UPDATE_NULL_FIELDS = "hoodie.update.null.fields";

Review comment:
       More comments need to be fill.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowUpdateHandle.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class HoodieRowUpdateHandle implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LogManager.getLogger(HoodieRowUpdateHandle.class);
+  private static final AtomicLong SEQGEN = new AtomicLong(1);
+
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+  private final HoodieInternalRowFileWriter fileWriter;
+  private final String partitionPath;
+  private final Path path;
+  private final String fileId;
+  private final StructType structType;
+  private final String prevFileName;
+  private final int shouldUpdateFieldIndex;
+  private final Set<Integer> updateNullFieldIndexSet;
+  Map<Integer, Pair<Integer, DataType>> updateValueFieldMap;
+  private final FileSystem fs;
+  private final HoodieInternalWriteStatus writeStatus;
+  private final HoodieTimer currTimer;
+
+  private boolean isFileUpdated = false;
+  private long writeTimeAccumulator = 0;
+  private long updateRecordsWritten = 0;
+
+  private Long greatestSequenceNumber = -1L;
+
+  public HoodieRowUpdateHandle(
+      HoodieTable table,
+      HoodieWriteConfig writeConfig,
+      String partitionPath,
+      String fileId,
+      String instantTime,
+      int taskPartitionId,
+      long taskId,
+      long taskEpochId,
+      StructType structType,
+      String prevFileName,
+      int shouldUpdateFieldIndex,
+      Set<Integer> updateNullFieldIndexSet,
+      Map<Integer, Pair<Integer, DataType>> updateValueFieldMap) {
+    this.partitionPath = partitionPath;
+    this.table = table;
+    this.writeConfig = writeConfig;
+    this.instantTime = instantTime;
+    this.taskPartitionId = taskPartitionId;
+    this.taskId = taskId;
+    this.taskEpochId = taskEpochId;
+    this.fileId = fileId;
+    this.currTimer = new HoodieTimer();
+    this.currTimer.startTimer();
+    this.fs = table.getMetaClient().getFs();
+    this.path = makeNewPath(partitionPath);
+    this.writeStatus = new 
HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+        writeConfig.getWriteStatusFailureFraction());
+    writeStatus.setPartitionPath(partitionPath);
+    writeStatus.setFileId(fileId);
+    this.structType = structType;
+    this.prevFileName = prevFileName;
+    this.shouldUpdateFieldIndex = shouldUpdateFieldIndex;
+    this.updateNullFieldIndexSet = updateNullFieldIndexSet;
+    this.updateValueFieldMap = updateValueFieldMap;
+    this.isFileUpdated = false;
+    try {
+      HoodiePartitionMetadata partitionMetadata =
+          new HoodiePartitionMetadata(
+              fs,
+              instantTime,
+              new Path(writeConfig.getBasePath()),
+              FSUtils.getPartitionPath(writeConfig.getBasePath(), 
partitionPath));
+      partitionMetadata.trySave(taskPartitionId);
+      createMarkerFile(partitionPath, 
FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, 
table.getBaseFileExtension()));
+      this.fileWriter = createNewFileWriter(path, table, writeConfig, 
structType);
+    } catch (IOException e) {
+      throw new HoodieInsertException("Failed to initialize file writer for 
path " + path, e);
+    }
+    LOG.info("New handle created for partition :" + partitionPath + " with 
fileId " + fileId);
+  }
+
+  /**
+   * Writes an {@link InternalRow} to the underlying 
HoodieInternalRowFileWriter. Before writing, value for meta columns are 
computed as required
+   * and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is 
what gets written to HoodieInternalRowFileWriter.
+   *
+   * @param record instance of {@link InternalRow} that needs to be written to 
the fileWriter.
+   * @throws IOException
+   */
+  public void write(InternalRow record) throws IOException {
+    long start = System.currentTimeMillis();
+    try {
+      String partitionPath = 
record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
+          HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
+      String seqId = HoodieRecord.generateSequenceId(instantTime, 
taskPartitionId, SEQGEN.getAndIncrement());
+      long sequenceNumber = Long.parseLong(seqId.split("_")[2]);
+      if (sequenceNumber < greatestSequenceNumber) {
+        throw new RuntimeException("Wrong sequence number, records should be 
sorted");
+      } else {
+        greatestSequenceNumber = sequenceNumber;
+      }
+      Integer recordKeyPos = HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS
+          .get(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      if (!record.isNullAt(shouldUpdateFieldIndex)) {
+        isFileUpdated = true;
+        updateRecordsWritten++;
+        for (Integer updateNullFieldIndex : updateNullFieldIndexSet) {
+          record.setNullAt(updateNullFieldIndex);
+        }
+        for (Entry<Integer, Pair<Integer, DataType>> entry : 
updateValueFieldMap.entrySet()) {
+          record.update(entry.getKey(), record.get(entry.getValue().getKey(), 
entry.getValue().getValue()));
+        }
+      }
+      String recordKey = null;
+      if (!record.isNullAt(recordKeyPos)) {
+        recordKey = record.getUTF8String(recordKeyPos).toString();
+      }
+
+      HoodieInternalRow internalRow = new HoodieInternalRow(instantTime, 
seqId, recordKey, partitionPath, path.getName(),
+          new InternalRowWrapper(structType.length(), record));
+      try {
+        fileWriter.writeRow(recordKey, internalRow);
+        writeStatus.markSuccess(recordKey);
+      } catch (Throwable t) {
+        writeStatus.markFailure(recordKey, t);
+      }
+    } catch (Throwable ge) {
+      writeStatus.setGlobalError(ge);
+      throw ge;
+    }
+    writeTimeAccumulator += (System.currentTimeMillis() - start);
+  }
+
+  public HoodieInternalWriteStatus close() throws IOException {
+    long start = System.currentTimeMillis();
+    fileWriter.close();
+    if (isFileUpdated) {
+      HoodieWriteStat stat = new HoodieWriteStat();
+      stat.setPartitionPath(partitionPath);
+      stat.setNumWrites(writeStatus.getTotalRecords());
+      stat.setNumDeletes(0);
+      stat.setNumInserts(0);
+      stat.setNumUpdateWrites(updateRecordsWritten);
+      stat.setPrevCommit(prevFileName);
+      stat.setFileId(fileId);
+      stat.setPath(new Path(writeConfig.getBasePath()), path);
+      long fileSizeInBytes = 
FSUtils.getFileSize(table.getMetaClient().getFs(), path);
+      stat.setTotalWriteBytes(fileSizeInBytes);
+      stat.setFileSizeInBytes(fileSizeInBytes);
+      stat.setTotalWriteErrors(writeStatus.getFailedRowsSize());
+      HoodieWriteStat.RuntimeStats runtimeStats = new 
HoodieWriteStat.RuntimeStats();
+      long processTime = currTimer.endTimer();
+      long writeTime = (System.currentTimeMillis() - start + 
writeTimeAccumulator) / 1000;
+      LOG.info("Finish updating file " + getFileName() + " of partition " + 
partitionPath
+          + ", takes " + processTime / 1000 + " seconds to process and " + 
writeTime
+          + " seconds to write.");
+      LOG.info("Finish updating file " + getFileName() + " of partition " + 
partitionPath
+          + ", takes " + processTime / 1000 + " seconds to process and "
+          + (System.currentTimeMillis() - start) / 1000 + " seconds to 
write.");
+      runtimeStats.setTotalCreateTime(processTime);
+      stat.setRuntimeStats(runtimeStats);
+      writeStatus.setStat(stat);
+      return writeStatus;
+    } else {
+      try {
+        fs.delete(path, false);
+      } catch (IOException e) {
+        LOG.warn("Failed to delete unnecessary file " + path.toString());

Review comment:
       append the `e` for more detailed information?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowUpdateHandle.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class HoodieRowUpdateHandle implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LogManager.getLogger(HoodieRowUpdateHandle.class);
+  private static final AtomicLong SEQGEN = new AtomicLong(1);
+
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+  private final HoodieInternalRowFileWriter fileWriter;
+  private final String partitionPath;
+  private final Path path;
+  private final String fileId;
+  private final StructType structType;
+  private final String prevFileName;
+  private final int shouldUpdateFieldIndex;
+  private final Set<Integer> updateNullFieldIndexSet;
+  Map<Integer, Pair<Integer, DataType>> updateValueFieldMap;
+  private final FileSystem fs;
+  private final HoodieInternalWriteStatus writeStatus;
+  private final HoodieTimer currTimer;
+
+  private boolean isFileUpdated = false;
+  private long writeTimeAccumulator = 0;
+  private long updateRecordsWritten = 0;
+
+  private Long greatestSequenceNumber = -1L;
+
+  public HoodieRowUpdateHandle(
+      HoodieTable table,
+      HoodieWriteConfig writeConfig,
+      String partitionPath,
+      String fileId,
+      String instantTime,
+      int taskPartitionId,
+      long taskId,
+      long taskEpochId,
+      StructType structType,
+      String prevFileName,
+      int shouldUpdateFieldIndex,
+      Set<Integer> updateNullFieldIndexSet,
+      Map<Integer, Pair<Integer, DataType>> updateValueFieldMap) {
+    this.partitionPath = partitionPath;
+    this.table = table;
+    this.writeConfig = writeConfig;
+    this.instantTime = instantTime;
+    this.taskPartitionId = taskPartitionId;
+    this.taskId = taskId;
+    this.taskEpochId = taskEpochId;
+    this.fileId = fileId;
+    this.currTimer = new HoodieTimer();
+    this.currTimer.startTimer();
+    this.fs = table.getMetaClient().getFs();
+    this.path = makeNewPath(partitionPath);
+    this.writeStatus = new 
HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+        writeConfig.getWriteStatusFailureFraction());
+    writeStatus.setPartitionPath(partitionPath);
+    writeStatus.setFileId(fileId);
+    this.structType = structType;
+    this.prevFileName = prevFileName;
+    this.shouldUpdateFieldIndex = shouldUpdateFieldIndex;
+    this.updateNullFieldIndexSet = updateNullFieldIndexSet;
+    this.updateValueFieldMap = updateValueFieldMap;
+    this.isFileUpdated = false;
+    try {
+      HoodiePartitionMetadata partitionMetadata =
+          new HoodiePartitionMetadata(
+              fs,
+              instantTime,
+              new Path(writeConfig.getBasePath()),
+              FSUtils.getPartitionPath(writeConfig.getBasePath(), 
partitionPath));
+      partitionMetadata.trySave(taskPartitionId);
+      createMarkerFile(partitionPath, 
FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, 
table.getBaseFileExtension()));
+      this.fileWriter = createNewFileWriter(path, table, writeConfig, 
structType);
+    } catch (IOException e) {
+      throw new HoodieInsertException("Failed to initialize file writer for 
path " + path, e);
+    }
+    LOG.info("New handle created for partition :" + partitionPath + " with 
fileId " + fileId);
+  }
+
+  /**
+   * Writes an {@link InternalRow} to the underlying 
HoodieInternalRowFileWriter. Before writing, value for meta columns are 
computed as required
+   * and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is 
what gets written to HoodieInternalRowFileWriter.
+   *
+   * @param record instance of {@link InternalRow} that needs to be written to 
the fileWriter.
+   * @throws IOException
+   */
+  public void write(InternalRow record) throws IOException {
+    long start = System.currentTimeMillis();
+    try {
+      String partitionPath = 
record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
+          HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
+      String seqId = HoodieRecord.generateSequenceId(instantTime, 
taskPartitionId, SEQGEN.getAndIncrement());
+      long sequenceNumber = Long.parseLong(seqId.split("_")[2]);
+      if (sequenceNumber < greatestSequenceNumber) {
+        throw new RuntimeException("Wrong sequence number, records should be 
sorted");
+      } else {
+        greatestSequenceNumber = sequenceNumber;
+      }
+      Integer recordKeyPos = HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS
+          .get(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      if (!record.isNullAt(shouldUpdateFieldIndex)) {
+        isFileUpdated = true;
+        updateRecordsWritten++;
+        for (Integer updateNullFieldIndex : updateNullFieldIndexSet) {
+          record.setNullAt(updateNullFieldIndex);
+        }
+        for (Entry<Integer, Pair<Integer, DataType>> entry : 
updateValueFieldMap.entrySet()) {
+          record.update(entry.getKey(), record.get(entry.getValue().getKey(), 
entry.getValue().getValue()));
+        }
+      }
+      String recordKey = null;
+      if (!record.isNullAt(recordKeyPos)) {
+        recordKey = record.getUTF8String(recordKeyPos).toString();
+      }
+
+      HoodieInternalRow internalRow = new HoodieInternalRow(instantTime, 
seqId, recordKey, partitionPath, path.getName(),
+          new InternalRowWrapper(structType.length(), record));
+      try {
+        fileWriter.writeRow(recordKey, internalRow);
+        writeStatus.markSuccess(recordKey);
+      } catch (Throwable t) {
+        writeStatus.markFailure(recordKey, t);
+      }
+    } catch (Throwable ge) {
+      writeStatus.setGlobalError(ge);
+      throw ge;
+    }
+    writeTimeAccumulator += (System.currentTimeMillis() - start);
+  }
+
+  public HoodieInternalWriteStatus close() throws IOException {
+    long start = System.currentTimeMillis();
+    fileWriter.close();
+    if (isFileUpdated) {
+      HoodieWriteStat stat = new HoodieWriteStat();
+      stat.setPartitionPath(partitionPath);
+      stat.setNumWrites(writeStatus.getTotalRecords());
+      stat.setNumDeletes(0);
+      stat.setNumInserts(0);
+      stat.setNumUpdateWrites(updateRecordsWritten);
+      stat.setPrevCommit(prevFileName);
+      stat.setFileId(fileId);
+      stat.setPath(new Path(writeConfig.getBasePath()), path);
+      long fileSizeInBytes = 
FSUtils.getFileSize(table.getMetaClient().getFs(), path);
+      stat.setTotalWriteBytes(fileSizeInBytes);
+      stat.setFileSizeInBytes(fileSizeInBytes);
+      stat.setTotalWriteErrors(writeStatus.getFailedRowsSize());
+      HoodieWriteStat.RuntimeStats runtimeStats = new 
HoodieWriteStat.RuntimeStats();
+      long processTime = currTimer.endTimer();
+      long writeTime = (System.currentTimeMillis() - start + 
writeTimeAccumulator) / 1000;
+      LOG.info("Finish updating file " + getFileName() + " of partition " + 
partitionPath
+          + ", takes " + processTime / 1000 + " seconds to process and " + 
writeTime
+          + " seconds to write.");
+      LOG.info("Finish updating file " + getFileName() + " of partition " + 
partitionPath
+          + ", takes " + processTime / 1000 + " seconds to process and "
+          + (System.currentTimeMillis() - start) / 1000 + " seconds to 
write.");

Review comment:
       Make it more readable? Two log? So many "+" links.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -384,6 +385,9 @@
           + " but for the case the write schema is not equal to the specified 
table schema, we can"
           + " specify the write schema by this parameter. Used by 
MergeIntoHoodieTableCommand");
 
+  public static final String UPDATE_JOIN_FIELDS = "hoodie.update.join.fields";
+  public static final String UPDATE_NULL_FIELDS = "hoodie.update.null.fields";
+  public static final String UPDATE_PARTITION_PATHS = 
"hoodie.update.partition.paths";

Review comment:
       empty line

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -47,6 +49,14 @@
   public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, 
HoodieSparkEngineContext context, HoodieTable table,
                                            HoodieWriteConfig config) {
     super(profile, context, table, config);
+    // todo

Review comment:
       `todo` for what?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to