nsivabalan commented on a change in pull request #4067:
URL: https://github.com/apache/hudi/pull/4067#discussion_r768268672



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -137,6 +136,14 @@
       .withDocumentation("There are cases when extra files are requested to be 
deleted from metadata table which was never added before. This config"
           + "determines how to handle such spurious deletes");
 
+  public static final ConfigProperty<Boolean> RECORDKEY_DE_DUPLICATE = 
ConfigProperty
+      .key("_" + METADATA_PREFIX + ".recordkey.deduplicate")

Review comment:
       lets try to maintain same naming across the board. 
   "_" + METADATA_PREFIX + "exclude.key.from.payload"

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
##########
@@ -98,8 +98,14 @@ public HoodieCreateHandle(HoodieWriteConfig config, String 
instantTime, HoodieTa
           new Path(config.getBasePath()), 
FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
       partitionMetadata.trySave(getPartitionId());
       createMarkerFile(partitionPath, 
FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, 
hoodieTable.getBaseFileExtension()));
+
+      Option<String> keyField = 
Option.ofNullable(hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp());

Review comment:
       nit: rename to recordKeyField

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -182,9 +185,14 @@ private void init(String fileId, String partitionPath, 
HoodieBaseFile baseFileTo
       // Create Marker file
       createMarkerFile(partitionPath, newFileName);
 
+      Option<Schema.Field> keySchemaFieldID = Option.empty();

Review comment:
       can we move this to writeHandle

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
##########
@@ -73,6 +74,17 @@ static boolean isMetadataTable(String basePath) {
     return basePath.endsWith(METADATA_TABLE_REL_PATH);
   }
 
+  /**
+   * Is the log file for the Metadata table?
+   *
+   * @param logFile - Log file to check
+   * @return True if the log file belongs to the Metadata table, False 
otherwise.
+   */
+  static boolean isMetadataTable(HoodieLogFile logFile) {

Review comment:
       do we still have callers for this?

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
##########
@@ -47,7 +48,7 @@
   public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf 
job) throws IOException {
     FileSplit fileSplit = (FileSplit) split;
     Path path = fileSplit.getPath();
-    reader = new HoodieHFileReader(conf, path, new CacheConfig(conf));
+    reader = new HoodieHFileReader(conf, path, new CacheConfig(conf), 
Option.empty());

Review comment:
       is it required to fetch table props and pass is right value for 
recordKeyField here ? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
##########
@@ -48,12 +48,18 @@
   public static <T extends HoodieRecordPayload, R extends IndexedRecord, I, K, 
O> HoodieFileWriter<R> getFileWriter(
       String instantTime, Path path, HoodieTable<T, I, K, O> hoodieTable, 
HoodieWriteConfig config, Schema schema,
       TaskContextSupplier taskContextSupplier) throws IOException {
+    return getFileWriter(instantTime, path, hoodieTable, config, schema, 
Option.empty(), taskContextSupplier);
+  }
+
+  public static <T extends HoodieRecordPayload, R extends IndexedRecord, I, K, 
O> HoodieFileWriter<R> getFileWriter(
+      String instantTime, Path path, HoodieTable<T, I, K, O> hoodieTable, 
HoodieWriteConfig config, Schema schema,
+      Option<Schema.Field> schemaKeyFieldID, TaskContextSupplier 
taskContextSupplier) throws IOException {

Review comment:
       recordKeySchemaField

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
##########
@@ -122,7 +131,14 @@ public boolean canWrite() {
 
   @Override
   public void writeAvro(String recordKey, IndexedRecord object) throws 
IOException {
-    byte[] value = HoodieAvroUtils.avroToBytes((GenericRecord)object);
+    byte[] value = HoodieAvroUtils.avroToBytes((GenericRecord) object);
+    if (excludeKeyFromPayload) {

Review comment:
       can we move this to constructor. one time would suffice

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
##########
@@ -122,7 +131,14 @@ public boolean canWrite() {
 
   @Override
   public void writeAvro(String recordKey, IndexedRecord object) throws 
IOException {
-    byte[] value = HoodieAvroUtils.avroToBytes((GenericRecord)object);
+    byte[] value = HoodieAvroUtils.avroToBytes((GenericRecord) object);
+    if (excludeKeyFromPayload) {
+      ValidationUtils.checkArgument(schemaKeyFieldID.isPresent(),
+          "Failed to exclude key from payload. Unknown key field for the 
record.");
+      GenericRecord tmp = HoodieAvroUtils.bytesToAvro(value, schema);

Review comment:
       tmp -> trimmedGenRecord or something. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileKeyExcludedDataBlock.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.io.storage.HoodieHFileKeyExcludedReader;
+import org.apache.hudi.io.storage.HoodieHFileReader;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HFile data block for the Metadata table records. Since the backing log 
format for metadata table
+ * is the HFile KeyValue and since the key field in the metadata record 
payload is a duplicate
+ * of the Key in the Cell, the redundant key field in the record can be 
nullified to save on the
+ * cost. Such trimmed metadata records need to re-materialized with the key 
field during deserialization.
+ */
+public class HoodieHFileKeyExcludedDataBlock extends HoodieHFileDataBlock {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileKeyExcludedDataBlock.class);
+
+  public HoodieHFileKeyExcludedDataBlock(HoodieLogFile logFile, 
FSDataInputStream inputStream, Option<byte[]> content,
+                                         boolean readBlockLazily, long 
position, long blockSize, long blockEndPos,
+                                         Schema readerSchema, 
Map<HeaderMetadataType, String> header,
+                                         Map<HeaderMetadataType, String> 
footer, boolean enableInlineReading,
+                                         String keyField) {
+    super(logFile, inputStream, content, readBlockLazily, position, blockSize, 
blockEndPos, readerSchema, header,
+        footer, enableInlineReading, keyField);
+  }
+
+  public HoodieHFileKeyExcludedDataBlock(@NotNull List<IndexedRecord> records,
+                                         @NotNull Map<HeaderMetadataType, 
String> header, String keyField) {
+    super(records, header, keyField);
+  }
+
+  /**
+   * Serialize the metadata table record to byte buffer after any field 
trimming if needed.
+   *
+   * @param record         - Record to serialize
+   * @param schemaKeyField - Key field in the schema
+   * @return Serialized byte array of the metadata trimmed record
+   */
+  @Override
+  protected ByteBuffer serializeRecord(final IndexedRecord record, final 
Option<Schema.Field> schemaKeyField) {
+    if (!schemaKeyField.isPresent()) {
+      return super.serializeRecord(record, schemaKeyField);
+    }
+
+    ValidationUtils.checkArgument(record.getSchema() != null, "Unknown schema 
for the record!");

Review comment:
       do we need to validate for every record? any cost associated with this 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileKeyExcludedReader.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * HFile reader for the Metadata table log files. Metadata table records in the
+ * HFile data blocks have the redundant key field in the record payload 
trimmed.
+ * So, when the log reader is reading records, materialization of such trimmed
+ * records must be done before handing the records to the callers. This class
+ * takes care of Metadata table record materialization, any needed.
+ *
+ * @param <R> Metadata table record type.
+ */
+public class HoodieHFileKeyExcludedReader<R extends IndexedRecord> extends 
HoodieHFileReader<R> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileKeyExcludedReader.class);
+
+  public HoodieHFileKeyExcludedReader(Configuration configuration, Path path, 
CacheConfig cacheConfig,
+                                      Option<String> keyField) throws 
IOException {
+    super(configuration, path, cacheConfig, keyField);
+  }
+
+  public HoodieHFileKeyExcludedReader(Configuration configuration, Path path, 
CacheConfig cacheConfig, FileSystem inlineFs,
+                                      String keyField) throws IOException {
+    super(configuration, path, cacheConfig, inlineFs, keyField);
+  }
+
+  public HoodieHFileKeyExcludedReader(final byte[] content, final String 
keyField) throws IOException {
+    super(content, keyField);
+  }
+
+  /**
+   * Materialize the record key field.
+   *
+   * @param keyField - Key field in the schema
+   * @param keyBytes - Key byte array
+   * @param record   - Record to materialize
+   */
+  @Override
+  protected void materializeRecordIfNeeded(final Option<String> keyField, 
final ByteBuffer keyBytes, R record) {
+    if (!keyField.isPresent()) {

Review comment:
       may I know why this could happen? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to