garyli1019 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676092014



##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link RowData} and 
keeps meta columns locally. But the {@link RowData}

Review comment:
       `Flink RowData` instead of `Internal Row`? 

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link RowData} and 
keeps meta columns locally. But the {@link RowData}
+ * does include the meta columns as well just that {@link HoodieRowData} will 
intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link RowData}.
+ */
+public class HoodieRowData implements RowData {
+
+  private final String commitTime;
+  private final String commitSeqNumber;
+  private final String recordKey;
+  private final String partitionPath;
+  private final String fileName;
+  private final RowData row;
+  private final int metaColumnsNum;
+
+  public HoodieRowData(String commitTime,
+                       String commitSeqNumber,
+                       String recordKey,
+                       String partitionPath,
+                       String fileName,
+                       RowData row) {
+    this.commitTime = commitTime;
+    this.commitSeqNumber = commitSeqNumber;
+    this.recordKey = recordKey;
+    this.partitionPath = partitionPath;
+    this.fileName = fileName;
+    this.row = row;
+    this.metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS.size();
+  }
+
+  @Override
+  public int getArity() {
+    return 5 + row.getArity();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+    return row.getRowKind();
+  }
+
+  @Override
+  public void setRowKind(RowKind kind) {
+    this.row.setRowKind(kind);
+  }
+
+  private String getMetaColumnVal(int ordinal) {
+    switch (ordinal) {
+      case 0: {
+        return commitTime;
+      }
+      case 1: {
+        return commitSeqNumber;
+      }
+      case 2: {
+        return recordKey;
+      }
+      case 3: {
+        return partitionPath;
+      }
+      case 4: {
+        return fileName;
+      }
+      default:
+        throw new IllegalArgumentException("Not expected");
+    }
+  }
+
+  @Override
+  public boolean isNullAt(int ordinal) {
+    if (ordinal < metaColumnsNum) {
+      return null == getMetaColumnVal(ordinal);
+    }
+    return row.isNullAt(ordinal - metaColumnsNum);
+  }
+
+  @Override
+  public boolean getBoolean(int ordinal) {
+    return row.getBoolean(ordinal - metaColumnsNum);

Review comment:
       Should we treat the meta column as a normal column? This looks a bit 
tricky for me.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Key generator for {@link RowData}.
+ */
+public class RowDataKeyGen {
+
+  // reference: NonpartitionedAvroKeyGenerator
+  private static final String EMPTY_PARTITION = "";
+
+  // reference: org.apache.hudi.keygen.KeyGenUtils
+  private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+  private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+
+  private static final String DEFAULT_PARTITION_PATH = "default";
+  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+
+  private final String[] recordKeyFields;
+  private final String[] partitionPathFields;
+
+  private final RowDataProjection recordKeyProjection;
+  private final RowDataProjection partitionPathProjection;
+
+  private final boolean hiveStylePartitioning;
+  private final boolean encodePartitionPath;
+
+  // efficient code path
+  private boolean simpleRecordKey = false;
+  private RowData.FieldGetter recordKeyFieldGetter;
+
+  private boolean simplePartitionPath = false;
+  private RowData.FieldGetter partitionPathFieldGetter;
+
+  private boolean nonPartitioned;
+
+  private RowDataKeyGen(

Review comment:
       Can we handle the keygen similar to the Spark `BuiltinKeyGenerator`, I 
think that's easier for further optimization.

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieRowData;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Create handle with RowData for datasource implemention of bulk insert.
+ */
+public class HoodieRowDataCreateHandle implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LogManager.getLogger(HoodieRowDataCreateHandle.class);
+  private static final AtomicLong SEQGEN = new AtomicLong(1);
+
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+  protected final HoodieRowDataFileWriter fileWriter;
+  private final String partitionPath;
+  private final Path path;
+  private final String fileId;
+  private final FileSystem fs;
+  protected final HoodieInternalWriteStatus writeStatus;
+  private final HoodieTimer currTimer;
+
+  public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig 
writeConfig, String partitionPath, String fileId,
+                                   String instantTime, int taskPartitionId, 
long taskId, long taskEpochId,
+                                   RowType rowType) {
+    this.partitionPath = partitionPath;
+    this.table = table;
+    this.writeConfig = writeConfig;
+    this.instantTime = instantTime;
+    this.taskPartitionId = taskPartitionId;
+    this.taskId = taskId;
+    this.taskEpochId = taskEpochId;
+    this.fileId = fileId;
+    this.currTimer = new HoodieTimer();
+    this.currTimer.startTimer();
+    this.fs = table.getMetaClient().getFs();
+    this.path = makeNewPath(partitionPath);
+    this.writeStatus = new 
HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+        writeConfig.getWriteStatusFailureFraction());
+    writeStatus.setPartitionPath(partitionPath);
+    writeStatus.setFileId(fileId);
+    try {
+      HoodiePartitionMetadata partitionMetadata =
+          new HoodiePartitionMetadata(
+              fs,
+              instantTime,
+              new Path(writeConfig.getBasePath()),
+              FSUtils.getPartitionPath(writeConfig.getBasePath(), 
partitionPath));
+      partitionMetadata.trySave(taskPartitionId);
+      createMarkerFile(partitionPath, 
FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, 
table.getBaseFileExtension()));
+      this.fileWriter = createNewFileWriter(path, table, writeConfig, rowType);
+    } catch (IOException e) {
+      throw new HoodieInsertException("Failed to initialize file writer for 
path " + path, e);
+    }
+    LOG.info("New handle created for partition :" + partitionPath + " with 
fileId " + fileId);
+  }
+
+  /**
+   * Writes an {@link RowData} to the underlying {@link 
HoodieRowDataFileWriter}.
+   * Before writing, value for meta columns are computed as required
+   * and wrapped in {@link HoodieRowData}. {@link HoodieRowData} is what gets 
written to HoodieRowDataFileWriter.
+   *
+   * @param recordKey     The record key
+   * @param partitionPath The partition path
+   * @param record        instance of {@link RowData} that needs to be written 
to the fileWriter.
+   * @throws IOException
+   */
+  public void write(String recordKey, String partitionPath, RowData record) 
throws IOException {
+    try {
+      String seqId = HoodieRecord.generateSequenceId(instantTime, 
taskPartitionId, SEQGEN.getAndIncrement());
+      HoodieRowData rowData = new HoodieRowData(instantTime, seqId, recordKey, 
partitionPath, path.getName(),
+          record);
+      try {
+        fileWriter.writeRow(recordKey, rowData);
+        writeStatus.markSuccess(recordKey);
+      } catch (Throwable t) {
+        writeStatus.markFailure(recordKey, t);
+      }
+    } catch (Throwable ge) {
+      writeStatus.setGlobalError(ge);
+      throw ge;
+    }
+  }
+
+  /**
+   * @returns {@code true} if this handle can take in more writes. else {@code 
false}.
+   */
+  public boolean canWrite() {
+    return fileWriter.canWrite();
+  }
+
+  /**
+   * Closes the {@link HoodieRowDataCreateHandle} and returns an instance of 
{@link HoodieInternalWriteStatus} containing the stats and
+   * status of the writes to this handle.
+   *
+   * @return the {@link HoodieInternalWriteStatus} containing the stats and 
status of the writes to this handle.
+   * @throws IOException
+   */
+  public HoodieInternalWriteStatus close() throws IOException {
+    fileWriter.close();
+    HoodieWriteStat stat = new HoodieWriteStat();

Review comment:
       This might cause a bug. I submitted a fix here 
https://github.com/apache/hudi/pull/3341

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * <p>The function should only be used in operation type {@link 
WriteOperationType#BULK_INSERT}.
+ *
+ * <p>Note: The function task requires the input stream be shuffled by 
partition path.
+ *
+ * @param <I> Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class BulkInsertWriteFunction<I, O>
+    extends ProcessFunction<I, O> {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkInsertWriteFunction.class);
+
+  /**
+   * Helper class for bulk insert mode.
+   */
+  private transient BulkInsertWriterHelper writerHelper;
+
+  /**
+   * Config options.
+   */
+  private final Configuration config;
+
+  /**
+   * Table row type.
+   */
+  private final RowType rowType;
+
+  /**
+   * Id of current subtask.
+   */
+  private int taskID;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  /**
+   * The initial inflight instant when start up.
+   */
+  private volatile String initInstant;
+
+  /**
+   * Gateway to send operator events to the operator coordinator.
+   */
+  private transient OperatorEventGateway eventGateway;
+
+  /**
+   * Commit action type.
+   */
+  private transient String actionType;
+
+  /**
+   * Constructs a StreamingSinkFunction.
+   *
+   * @param config The config options
+   */
+  public BulkInsertWriteFunction(Configuration config, RowType rowType) {
+    this.config = config;
+    this.rowType = rowType;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws IOException {
+    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+    this.writeClient = StreamerUtil.createWriteClient(this.config, 
getRuntimeContext());
+    this.actionType = CommitUtils.getCommitActionType(
+        WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+        HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));

Review comment:
       Should we only make this function work for COW and Bulk_insert?

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+
+import java.util.HashMap;
+
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Hoodie Write Support for directly writing {@link RowData} to Parquet.
+ */
+public class HoodieRowDataParquetWriteSupport extends 
RowDataParquetWriteSupport {
+
+  private final Configuration hadoopConf;
+  private final BloomFilter bloomFilter;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, 
BloomFilter bloomFilter) {
+    super(rowType);
+    this.hadoopConf = new Configuration(conf);
+    this.bloomFilter = bloomFilter;
+  }
+
+  public Configuration getHadoopConf() {
+    return hadoopConf;
+  }
+
+  @Override
+  public WriteSupport.FinalizedWriteContext finalizeWrite() {
+    HashMap<String, String> extraMetaData = new HashMap<>();

Review comment:
       `parquetExtraMetadata` or `baseFileMetadata`? `extraMetaData` is a bit 
ambiguous with the extra metadata in the commit file.

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link RowData} and 
keeps meta columns locally. But the {@link RowData}
+ * does include the meta columns as well just that {@link HoodieRowData} will 
intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link RowData}.
+ */
+public class HoodieRowData implements RowData {
+
+  private final String commitTime;
+  private final String commitSeqNumber;
+  private final String recordKey;
+  private final String partitionPath;
+  private final String fileName;
+  private final RowData row;
+  private final int metaColumnsNum;
+
+  public HoodieRowData(String commitTime,
+                       String commitSeqNumber,
+                       String recordKey,
+                       String partitionPath,
+                       String fileName,
+                       RowData row) {
+    this.commitTime = commitTime;
+    this.commitSeqNumber = commitSeqNumber;
+    this.recordKey = recordKey;
+    this.partitionPath = partitionPath;
+    this.fileName = fileName;
+    this.row = row;
+    this.metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS.size();
+  }
+
+  @Override
+  public int getArity() {
+    return 5 + row.getArity();

Review comment:
       `metaColumnsNum` instead of 5




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to