This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch release-0.10.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7de5d1760191128c86ef40ac2f9cb57e87328e3e
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Dec 3 07:20:21 2021 -0500

    [HUDI-2902] Fixing populate meta fields with Hfile writers and Disabling 
virtual keys by default for metadata table (#4194)
    
    (cherry picked from commit e483f7c776ed53015d08b78fc33729892dc2143c)
---
 .../apache/hudi/io/HoodieSortedMergeHandle.java    |   3 +-
 .../hudi/io/storage/HoodieFileWriterFactory.java   |   2 +-
 .../apache/hudi/io/storage/HoodieHFileWriter.java  |  14 +-
 .../io/storage/TestHoodieHFileReaderWriter.java    |  55 ++++++--
 .../resources/exampleSchemaWithMetaFields.avsc     |  66 +++++++++
 .../functional/TestHoodieBackedMetadata.java       | 157 +++++++++++++++++++++
 .../client/functional/TestHoodieMetadataBase.java  |   2 +-
 .../hudi/common/config/HoodieMetadataConfig.java   |  12 +-
 .../HoodieMetadataMergedLogRecordReader.java       |   4 +-
 ...inuousModeWithMultipleWriters.COPY_ON_WRITE.zip | Bin 2592485 -> 2592484 
bytes
 ...inuousModeWithMultipleWriters.MERGE_ON_READ.zip | Bin 3015940 -> 3015939 
bytes
 11 files changed, 287 insertions(+), 28 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 606e63a..533611d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.generic.GenericRecord;
@@ -72,7 +73,7 @@ public class HoodieSortedMergeHandle<T extends 
HoodieRecordPayload, I, K, O> ext
    */
   @Override
   public void write(GenericRecord oldRecord) {
-    String key = 
oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+    String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, 
keyGeneratorOpt);
 
     // To maintain overall sorted order across updates and inserts, write any 
new inserts whose keys are less than
     // the oldRecord's key.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index c6d4540..0b6afd4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -89,7 +89,7 @@ public class HoodieFileWriterFactory {
         config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), 
config.getHFileMaxFileSize(),
         PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, 
filter, HFILE_COMPARATOR);
 
-    return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, 
taskContextSupplier);
+    return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, 
taskContextSupplier, config.populateMetaFields());
   }
 
   private static <T extends HoodieRecordPayload, R extends IndexedRecord> 
HoodieFileWriter<R> newOrcFileWriter(
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
index d7e01c5..a719bcb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
@@ -62,6 +62,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, 
R extends IndexedR
   private final long maxFileSize;
   private final String instantTime;
   private final TaskContextSupplier taskContextSupplier;
+  private final boolean populateMetaFields;
   private HFile.Writer writer;
   private String minRecordKey;
   private String maxRecordKey;
@@ -70,7 +71,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, 
R extends IndexedR
   private static String DROP_BEHIND_CACHE_COMPACTION_KEY = 
"hbase.hfile.drop.behind.compaction";
 
   public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig 
hfileConfig, Schema schema,
-                           TaskContextSupplier taskContextSupplier) throws 
IOException {
+                           TaskContextSupplier taskContextSupplier, boolean 
populateMetaFields) throws IOException {
 
     Configuration conf = FSUtils.registerFileSystem(file, 
hfileConfig.getHadoopConf());
     this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
@@ -84,6 +85,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, 
R extends IndexedR
     this.maxFileSize = hfileConfig.getMaxFileSize();
     this.instantTime = instantTime;
     this.taskContextSupplier = taskContextSupplier;
+    this.populateMetaFields = populateMetaFields;
 
     HFileContext context = new 
HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize())
         .withCompression(hfileConfig.getCompressionAlgorithm())
@@ -104,9 +106,13 @@ public class HoodieHFileWriter<T extends 
HoodieRecordPayload, R extends IndexedR
 
   @Override
   public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws 
IOException {
-    prepRecordWithMetadata(avroRecord, record, instantTime,
-        taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, 
file.getName());
-    writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
+    if (populateMetaFields) {
+      prepRecordWithMetadata(avroRecord, record, instantTime,
+          taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, 
file.getName());
+      writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
+    } else {
+      writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
+    }
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
index c087607..86a0886 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
@@ -22,6 +22,9 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -34,19 +37,24 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.util.Pair;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
 
 import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
 import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
@@ -55,6 +63,9 @@ import static 
org.apache.hudi.io.storage.HoodieHFileConfig.DROP_BEHIND_CACHE_COM
 import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
 import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.when;
 
 public class TestHoodieHFileReaderWriter {
   @TempDir File tempDir;
@@ -73,21 +84,34 @@ public class TestHoodieHFileReaderWriter {
     }
   }
 
-  private HoodieHFileWriter createHFileWriter(Schema avroSchema) throws 
Exception {
+  private static Stream<Arguments> populateMetaFieldsAndTestAvroWithMeta() {
+    return Arrays.stream(new Boolean[][] {
+        {true, true},
+        {false, true},
+        {true, false},
+        {false, false}
+    }).map(Arguments::of);
+  }
+
+  private HoodieHFileWriter createHFileWriter(Schema avroSchema, boolean 
populateMetaFields) throws Exception {
     BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, 
-1, BloomFilterTypeCode.SIMPLE.name());
     Configuration conf = new Configuration();
     TaskContextSupplier mockTaskContextSupplier = 
Mockito.mock(TaskContextSupplier.class);
+    Supplier<Integer> partitionSupplier = Mockito.mock(Supplier.class);
+    
when(mockTaskContextSupplier.getPartitionIdSupplier()).thenReturn(partitionSupplier);
+    when(partitionSupplier.get()).thenReturn(10);
     String instantTime = "000";
 
     HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, 
Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024,
         PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, 
filter, HFILE_COMPARATOR);
-    return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, 
avroSchema, mockTaskContextSupplier);
+    return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, 
avroSchema, mockTaskContextSupplier, populateMetaFields);
   }
 
-  @Test
-  public void testWriteReadHFile() throws Exception {
-    Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleSchema.avsc");
-    HoodieHFileWriter writer = createHFileWriter(avroSchema);
+  @ParameterizedTest
+  @MethodSource("populateMetaFieldsAndTestAvroWithMeta")
+  public void testWriteReadHFile(boolean populateMetaFields, boolean 
testAvroWithMeta) throws Exception {
+    Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleSchemaWithMetaFields.avsc");
+    HoodieHFileWriter writer = createHFileWriter(avroSchema, 
populateMetaFields);
     List<String> keys = new ArrayList<>();
     Map<String, GenericRecord> recordMap = new HashMap<>();
     for (int i = 0; i < 100; i++) {
@@ -97,7 +121,13 @@ public class TestHoodieHFileReaderWriter {
       keys.add(key);
       record.put("time", Integer.toString(RANDOM.nextInt()));
       record.put("number", i);
-      writer.writeAvro(key, record);
+      if (testAvroWithMeta) {
+        writer.writeAvroWithMetadata(record, new HoodieRecord(new 
HoodieKey((String) record.get("_row_key"),
+            Integer.toString((Integer) record.get("number"))), new 
EmptyHoodieRecordPayload())); // payload does not matter. GenericRecord passed 
in is what matters
+        // only HoodieKey will be looked up from the 2nd arg(HoodieRecord).
+      } else {
+        writer.writeAvro(key, record);
+      }
       recordMap.put(key, record);
     }
     writer.close();
@@ -109,8 +139,8 @@ public class TestHoodieHFileReaderWriter {
     records.forEach(entry -> assertEquals(entry.getSecond(), 
recordMap.get(entry.getFirst())));
     hoodieHFileReader.close();
 
-    for (int i = 0; i < 20; i++) {
-      int randomRowstoFetch = 5 + RANDOM.nextInt(50);
+    for (int i = 0; i < 2; i++) {
+      int randomRowstoFetch = 5 + RANDOM.nextInt(10);
       Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);
       List<String> rowsList = new ArrayList<>(rowsToFetch);
       Collections.sort(rowsList);
@@ -119,6 +149,11 @@ public class TestHoodieHFileReaderWriter {
       assertEquals(result.size(), randomRowstoFetch);
       result.forEach(entry -> {
         assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()));
+        if (populateMetaFields && testAvroWithMeta) {
+          
assertNotNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+        } else {
+          
assertNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+        }
       });
       hoodieHFileReader.close();
     }
diff --git 
a/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithMetaFields.avsc
 
b/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithMetaFields.avsc
new file mode 100644
index 0000000..c3fa822
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithMetaFields.avsc
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+    "namespace": "example.schema",
+    "type": "record",
+    "name": "trip",
+    "fields": [
+        {
+                "name": "_hoodie_commit_time",
+                "type": ["null","string"],
+                "default":null
+        },
+        {
+                "name": "_hoodie_commit_seqno",
+                "type": ["null","string"],
+                "default":null
+        },
+        {
+                "name": "_hoodie_record_key",
+                "type": ["null","string"],
+                "default":null
+        },
+        {
+                "name": "_hoodie_partition_path",
+                "type": ["null","string"],
+                "default":null
+        },
+        {
+                "name": "_hoodie_file_name",
+                "type": ["null","string"],
+                "default":null
+        },
+        {
+                "name": "_hoodie_operation",
+                "type": ["null","string"],
+                "default":null
+        },
+        {
+            "name": "_row_key",
+            "type": "string"
+        },
+        {
+            "name": "time",
+            "type": "string"
+        },
+        {
+            "name": "number",
+            "type": ["int", "null"]
+        }
+    ]
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 73b7811..ed245da 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.client.functional;
 
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -29,6 +30,8 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -41,6 +44,7 @@ import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -60,12 +64,18 @@ import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.io.storage.HoodieHFileReader;
 import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
 import org.apache.hudi.metadata.HoodieMetadataMetrics;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.HoodieSparkTable;
@@ -75,9 +85,13 @@ import 
org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
 import org.apache.hudi.table.upgrade.UpgradeDowngrade;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -120,10 +134,12 @@ import static 
org.apache.hudi.common.model.WriteOperationType.DELETE;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -317,6 +333,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
   /**
    * Tests that table services in data table won't trigger table services in 
metadata table.
+   *
    * @throws Exception
    */
   @Test
@@ -346,6 +363,56 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001");
   }
 
+
+  /**
+   * Tests that virtual key configs are honored in base files after compaction 
in metadata table.
+   *
+   * @throws Exception
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testVirtualKeysInBaseFiles(boolean populateMetaFields) throws 
Exception {
+    HoodieTableType tableType = MERGE_ON_READ;
+    init(tableType, false);
+    writeConfig = getWriteConfigBuilder(true, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .enableFullScan(true)
+            .enableMetrics(false)
+            .withPopulateMetaFields(populateMetaFields)
+            .withMaxNumDeltaCommitsBeforeCompaction(2)
+            .build()).build();
+    initWriteConfigAndMetatableWriter(writeConfig, true);
+
+    doWriteOperation(testTable, "0000001", INSERT);
+    doClean(testTable, "0000003", Arrays.asList("0000001"));
+    // this should have triggered compaction in metadata table
+    doWriteOperation(testTable, "0000004", UPSERT);
+
+    HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
+    assertTrue(tableMetadata.getLatestCompactionTime().isPresent());
+    assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001");
+
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
+    HoodieWriteConfig metadataTableWriteConfig = 
getMetadataWriteConfig(writeConfig);
+    metadataMetaClient.reloadActiveTimeline();
+
+    HoodieTable table = HoodieSparkTable.create(metadataTableWriteConfig, 
context, metadataMetaClient);
+    table.getHoodieView().sync();
+    List<FileSlice> fileSlices = 
table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList());
+    HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
+    HoodieHFileReader hoodieHFileReader = new 
HoodieHFileReader(context.getHadoopConf().get(), new Path(baseFile.getPath()),
+        new CacheConfig(context.getHadoopConf().get()));
+    List<Pair<String, IndexedRecord>> records = 
hoodieHFileReader.readAllRecords();
+    records.forEach(entry -> {
+      if (populateMetaFields) {
+        assertNotNull(((GenericRecord) 
entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+      } else {
+        assertNull(((GenericRecord) 
entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+      }
+    });
+  }
+
   /**
    * Test rollback of various table operations sync to Metadata Table 
correctly.
    */
@@ -586,6 +653,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
    * Tests the metadata payload spurious deletes.
    * Lets say a commit was applied to metadata table, and later was explicitly 
got rolledback. Due to spark task failures, there could be more files in 
rollback
    * metadata when compared to the original commit metadata. When payload 
consistency check is enabled, it will throw exception. If not, it will succeed.
+   *
    * @throws Exception
    */
   @ParameterizedTest
@@ -1308,6 +1376,95 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
   }
 
+  /**
+   * Fetching WriteConfig for metadata table from Data table's writeConfig is 
not trivial and the method is not public in source code. so, for now,
+   * using this method which mimics source code.
+   * @param writeConfig
+   * @return
+   */
+  private HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig 
writeConfig) {
+    int parallelism = writeConfig.getMetadataInsertParallelism();
+
+    int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), 
writeConfig.getMinCommitsToKeep());
+    int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(), 
writeConfig.getMaxCommitsToKeep());
+
+    // Create the write config for the metadata table by borrowing options 
from the main write config.
+    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
+            
.withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled())
+            
.withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs())
+            
.withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs())
+            
.withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
+            .build())
+        .withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
+        .withAutoCommit(true)
+        .withAvroSchemaValidate(true)
+        .withEmbeddedTimelineServerEnabled(false)
+        .withMarkersType(MarkerType.DIRECT.name())
+        .withRollbackUsingMarkers(false)
+        
.withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
+        .withSchema(HoodieMetadataRecord.getClassSchema().toString())
+        .forTable(writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withAsyncClean(writeConfig.isMetadataAsyncClean())
+            // we will trigger cleaning manually, to control the instant times
+            .withAutoClean(false)
+            .withCleanerParallelism(parallelism)
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
+            .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
+            // we will trigger compaction manually, to control the instant 
times
+            .withInlineCompaction(false)
+            
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
+        .withParallelism(parallelism, parallelism)
+        .withDeleteParallelism(parallelism)
+        .withRollbackParallelism(parallelism)
+        .withFinalizeWriteParallelism(parallelism)
+        .withAllowMultiWriteOnSameInstant(true)
+        
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
+        
.withPopulateMetaFields(writeConfig.getMetadataConfig().populateMetaFields());
+
+    // RecordKey properties are needed for the metadata table records
+    final Properties properties = new Properties();
+    properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), 
HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY);
+    properties.put("hoodie.datasource.write.recordkey.field", 
HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY);
+    builder.withProperties(properties);
+
+    if (writeConfig.isMetricsOn()) {
+      builder.withMetricsConfig(HoodieMetricsConfig.newBuilder()
+          .withReporterType(writeConfig.getMetricsReporterType().toString())
+          .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled())
+          .on(true).build());
+      switch (writeConfig.getMetricsReporterType()) {
+        case GRAPHITE:
+          
builder.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder()
+              .onGraphitePort(writeConfig.getGraphiteServerPort())
+              .toGraphiteHost(writeConfig.getGraphiteServerHost())
+              .usePrefix(writeConfig.getGraphiteMetricPrefix()).build());
+          break;
+        case JMX:
+          builder.withMetricsJmxConfig(HoodieMetricsJmxConfig.newBuilder()
+              .onJmxPort(writeConfig.getJmxPort())
+              .toJmxHost(writeConfig.getJmxHost())
+              .build());
+          break;
+        case DATADOG:
+        case PROMETHEUS:
+        case PROMETHEUS_PUSHGATEWAY:
+        case CONSOLE:
+        case INMEMORY:
+        case CLOUDWATCH:
+          break;
+        default:
+          throw new HoodieMetadataException("Unsupported Metrics Reporter type 
" + writeConfig.getMetricsReporterType());
+      }
+    }
+    return builder.build();
+  }
+
   private void doPreBootstrapOperations(HoodieTestTable testTable) throws 
Exception {
     doPreBootstrapOperations(testTable, "0000001", "0000002");
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index 0a45d31..25dfd29 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -292,7 +292,7 @@ public class TestHoodieMetadataBase extends 
HoodieClientTestHarness {
             .enable(useFileListingMetadata)
             .enableFullScan(enableFullScan)
             .enableMetrics(enableMetrics)
-            .withPopulateMetaFields(false)
+            
.withPopulateMetaFields(HoodieMetadataConfig.POPULATE_META_FIELDS.defaultValue())
             .ignoreSpuriousDeletes(validateMetadataPayloadConsistency)
             .build())
         .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 810e475..51791c9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -118,23 +118,15 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .sinceVersion("0.7.0")
       .withDocumentation("Parallelism to use, when listing the table on lake 
storage.");
 
-  public static final ConfigProperty<Boolean> ENABLE_INLINE_READING = 
ConfigProperty
-      .key(METADATA_PREFIX + ".enable.inline.reading")
-      .defaultValue(true)
-      .sinceVersion("0.10.0")
-      .withDocumentation("Enable inline reading of Log files. By default log 
block contents are read as byte[] using regular input stream and records "
-          + "are deserialized from it. Enabling this will read each log block 
as an inline file and read records from the same. For instance, "
-          + "for HFileDataBlock, a inline file will be read using 
HFileReader.");
-
   public static final ConfigProperty<Boolean> ENABLE_FULL_SCAN_LOG_FILES = 
ConfigProperty
       .key(METADATA_PREFIX + ".enable.full.scan.log.files")
       .defaultValue(true)
       .sinceVersion("0.10.0")
       .withDocumentation("Enable full scanning of log files while reading log 
records. If disabled, hudi does look up of only interested entries.");
 
-  public static final ConfigProperty<String> POPULATE_META_FIELDS = 
ConfigProperty
+  public static final ConfigProperty<Boolean> POPULATE_META_FIELDS = 
ConfigProperty
       .key(METADATA_PREFIX + ".populate.meta.fields")
-      .defaultValue("false")
+      .defaultValue(true)
       .sinceVersion("0.10.0")
       .withDocumentation("When enabled, populates all meta fields. When 
disabled, no meta fields are populated.");
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
index 2c9ca39..e635eea 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
@@ -28,6 +28,8 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.log4j.LogManager;
@@ -142,7 +144,7 @@ public class HoodieMetadataMergedLogRecordReader extends 
HoodieMergedLogRecordSc
    */
   public static class Builder extends HoodieMergedLogRecordScanner.Builder {
     private Set<String> mergeKeyFilter = Collections.emptySet();
-    private boolean enableFullScan;
+    private boolean enableFullScan = 
HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue();
     private boolean enableInlineReading;
 
     @Override
diff --git 
a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip
 
b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip
index 42c0301..299b070 100644
Binary files 
a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip
 and 
b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip
 differ
diff --git 
a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip
 
b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip
index 8e57b1d..d80439d 100644
Binary files 
a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip
 and 
b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip
 differ

Reply via email to