This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e483f7c [HUDI-2902] Fixing populate meta fields with Hfile writers
and Disabling virtual keys by default for metadata table (#4194)
e483f7c is described below
commit e483f7c776ed53015d08b78fc33729892dc2143c
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Dec 3 07:20:21 2021 -0500
[HUDI-2902] Fixing populate meta fields with Hfile writers and Disabling
virtual keys by default for metadata table (#4194)
---
.../apache/hudi/io/HoodieSortedMergeHandle.java | 3 +-
.../hudi/io/storage/HoodieFileWriterFactory.java | 2 +-
.../apache/hudi/io/storage/HoodieHFileWriter.java | 14 +-
.../io/storage/TestHoodieHFileReaderWriter.java | 55 ++++++--
.../resources/exampleSchemaWithMetaFields.avsc | 66 +++++++++
.../functional/TestHoodieBackedMetadata.java | 157 +++++++++++++++++++++
.../client/functional/TestHoodieMetadataBase.java | 2 +-
.../hudi/common/config/HoodieMetadataConfig.java | 12 +-
.../HoodieMetadataMergedLogRecordReader.java | 4 +-
...inuousModeWithMultipleWriters.COPY_ON_WRITE.zip | Bin 2592485 -> 2592484
bytes
...inuousModeWithMultipleWriters.MERGE_ON_READ.zip | Bin 3015940 -> 3015939
bytes
11 files changed, 287 insertions(+), 28 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 606e63a..533611d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
@@ -72,7 +73,7 @@ public class HoodieSortedMergeHandle<T extends
HoodieRecordPayload, I, K, O> ext
*/
@Override
public void write(GenericRecord oldRecord) {
- String key =
oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord,
keyGeneratorOpt);
// To maintain overall sorted order across updates and inserts, write any
new inserts whose keys are less than
// the oldRecord's key.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index c6d4540..0b6afd4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -89,7 +89,7 @@ public class HoodieFileWriterFactory {
config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(),
config.getHFileMaxFileSize(),
PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION,
filter, HFILE_COMPARATOR);
- return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema,
taskContextSupplier);
+ return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema,
taskContextSupplier, config.populateMetaFields());
}
private static <T extends HoodieRecordPayload, R extends IndexedRecord>
HoodieFileWriter<R> newOrcFileWriter(
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
index d7e01c5..a719bcb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
@@ -62,6 +62,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload,
R extends IndexedR
private final long maxFileSize;
private final String instantTime;
private final TaskContextSupplier taskContextSupplier;
+ private final boolean populateMetaFields;
private HFile.Writer writer;
private String minRecordKey;
private String maxRecordKey;
@@ -70,7 +71,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload,
R extends IndexedR
private static String DROP_BEHIND_CACHE_COMPACTION_KEY =
"hbase.hfile.drop.behind.compaction";
public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig
hfileConfig, Schema schema,
- TaskContextSupplier taskContextSupplier) throws
IOException {
+ TaskContextSupplier taskContextSupplier, boolean
populateMetaFields) throws IOException {
Configuration conf = FSUtils.registerFileSystem(file,
hfileConfig.getHadoopConf());
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
@@ -84,6 +85,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload,
R extends IndexedR
this.maxFileSize = hfileConfig.getMaxFileSize();
this.instantTime = instantTime;
this.taskContextSupplier = taskContextSupplier;
+ this.populateMetaFields = populateMetaFields;
HFileContext context = new
HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize())
.withCompression(hfileConfig.getCompressionAlgorithm())
@@ -104,9 +106,13 @@ public class HoodieHFileWriter<T extends
HoodieRecordPayload, R extends IndexedR
@Override
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws
IOException {
- prepRecordWithMetadata(avroRecord, record, instantTime,
- taskContextSupplier.getPartitionIdSupplier().get(), recordIndex,
file.getName());
- writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
+ if (populateMetaFields) {
+ prepRecordWithMetadata(avroRecord, record, instantTime,
+ taskContextSupplier.getPartitionIdSupplier().get(), recordIndex,
file.getName());
+ writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
+ } else {
+ writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
+ }
}
@Override
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
index c087607..86a0886 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
@@ -22,6 +22,9 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
@@ -34,19 +37,24 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
import static
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
@@ -55,6 +63,9 @@ import static
org.apache.hudi.io.storage.HoodieHFileConfig.DROP_BEHIND_CACHE_COM
import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.when;
public class TestHoodieHFileReaderWriter {
@TempDir File tempDir;
@@ -73,21 +84,34 @@ public class TestHoodieHFileReaderWriter {
}
}
- private HoodieHFileWriter createHFileWriter(Schema avroSchema) throws
Exception {
+ private static Stream<Arguments> populateMetaFieldsAndTestAvroWithMeta() {
+ return Arrays.stream(new Boolean[][] {
+ {true, true},
+ {false, true},
+ {true, false},
+ {false, false}
+ }).map(Arguments::of);
+ }
+
+ private HoodieHFileWriter createHFileWriter(Schema avroSchema, boolean
populateMetaFields) throws Exception {
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001,
-1, BloomFilterTypeCode.SIMPLE.name());
Configuration conf = new Configuration();
TaskContextSupplier mockTaskContextSupplier =
Mockito.mock(TaskContextSupplier.class);
+ Supplier<Integer> partitionSupplier = Mockito.mock(Supplier.class);
+
when(mockTaskContextSupplier.getPartitionIdSupplier()).thenReturn(partitionSupplier);
+ when(partitionSupplier.get()).thenReturn(10);
String instantTime = "000";
HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf,
Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024,
PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION,
filter, HFILE_COMPARATOR);
- return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig,
avroSchema, mockTaskContextSupplier);
+ return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig,
avroSchema, mockTaskContextSupplier, populateMetaFields);
}
- @Test
- public void testWriteReadHFile() throws Exception {
- Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class,
"/exampleSchema.avsc");
- HoodieHFileWriter writer = createHFileWriter(avroSchema);
+ @ParameterizedTest
+ @MethodSource("populateMetaFieldsAndTestAvroWithMeta")
+ public void testWriteReadHFile(boolean populateMetaFields, boolean
testAvroWithMeta) throws Exception {
+ Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class,
"/exampleSchemaWithMetaFields.avsc");
+ HoodieHFileWriter writer = createHFileWriter(avroSchema,
populateMetaFields);
List<String> keys = new ArrayList<>();
Map<String, GenericRecord> recordMap = new HashMap<>();
for (int i = 0; i < 100; i++) {
@@ -97,7 +121,13 @@ public class TestHoodieHFileReaderWriter {
keys.add(key);
record.put("time", Integer.toString(RANDOM.nextInt()));
record.put("number", i);
- writer.writeAvro(key, record);
+ if (testAvroWithMeta) {
+ writer.writeAvroWithMetadata(record, new HoodieRecord(new
HoodieKey((String) record.get("_row_key"),
+ Integer.toString((Integer) record.get("number"))), new
EmptyHoodieRecordPayload())); // payload does not matter. GenericRecord passed
in is what matters
+ // only HoodieKey will be looked up from the 2nd arg(HoodieRecord).
+ } else {
+ writer.writeAvro(key, record);
+ }
recordMap.put(key, record);
}
writer.close();
@@ -109,8 +139,8 @@ public class TestHoodieHFileReaderWriter {
records.forEach(entry -> assertEquals(entry.getSecond(),
recordMap.get(entry.getFirst())));
hoodieHFileReader.close();
- for (int i = 0; i < 20; i++) {
- int randomRowstoFetch = 5 + RANDOM.nextInt(50);
+ for (int i = 0; i < 2; i++) {
+ int randomRowstoFetch = 5 + RANDOM.nextInt(10);
Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);
List<String> rowsList = new ArrayList<>(rowsToFetch);
Collections.sort(rowsList);
@@ -119,6 +149,11 @@ public class TestHoodieHFileReaderWriter {
assertEquals(result.size(), randomRowstoFetch);
result.forEach(entry -> {
assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()));
+ if (populateMetaFields && testAvroWithMeta) {
+
assertNotNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ } else {
+
assertNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ }
});
hoodieHFileReader.close();
}
diff --git
a/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithMetaFields.avsc
b/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithMetaFields.avsc
new file mode 100644
index 0000000..c3fa822
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithMetaFields.avsc
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace": "example.schema",
+ "type": "record",
+ "name": "trip",
+ "fields": [
+ {
+ "name": "_hoodie_commit_time",
+ "type": ["null","string"],
+ "default":null
+ },
+ {
+ "name": "_hoodie_commit_seqno",
+ "type": ["null","string"],
+ "default":null
+ },
+ {
+ "name": "_hoodie_record_key",
+ "type": ["null","string"],
+ "default":null
+ },
+ {
+ "name": "_hoodie_partition_path",
+ "type": ["null","string"],
+ "default":null
+ },
+ {
+ "name": "_hoodie_file_name",
+ "type": ["null","string"],
+ "default":null
+ },
+ {
+ "name": "_hoodie_operation",
+ "type": ["null","string"],
+ "default":null
+ },
+ {
+ "name": "_row_key",
+ "type": "string"
+ },
+ {
+ "name": "time",
+ "type": "string"
+ },
+ {
+ "name": "number",
+ "type": ["int", "null"]
+ }
+ ]
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 73b7811..ed245da 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -18,6 +18,7 @@
package org.apache.hudi.client.functional;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -29,6 +30,8 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -41,6 +44,7 @@ import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -60,12 +64,18 @@ import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
@@ -75,9 +85,13 @@ import
org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -120,10 +134,12 @@ import static
org.apache.hudi.common.model.WriteOperationType.DELETE;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -317,6 +333,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
/**
* Tests that table services in data table won't trigger table services in
metadata table.
+ *
* @throws Exception
*/
@Test
@@ -346,6 +363,56 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001");
}
+
+ /**
+ * Tests that virtual key configs are honored in base files after compaction
in metadata table.
+ *
+ * @throws Exception
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testVirtualKeysInBaseFiles(boolean populateMetaFields) throws
Exception {
+ HoodieTableType tableType = MERGE_ON_READ;
+ init(tableType, false);
+ writeConfig = getWriteConfigBuilder(true, true, false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .enableFullScan(true)
+ .enableMetrics(false)
+ .withPopulateMetaFields(populateMetaFields)
+ .withMaxNumDeltaCommitsBeforeCompaction(2)
+ .build()).build();
+ initWriteConfigAndMetatableWriter(writeConfig, true);
+
+ doWriteOperation(testTable, "0000001", INSERT);
+ doClean(testTable, "0000003", Arrays.asList("0000001"));
+ // this should have triggered compaction in metadata table
+ doWriteOperation(testTable, "0000004", UPSERT);
+
+ HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
+ assertTrue(tableMetadata.getLatestCompactionTime().isPresent());
+ assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001");
+
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
+ HoodieWriteConfig metadataTableWriteConfig =
getMetadataWriteConfig(writeConfig);
+ metadataMetaClient.reloadActiveTimeline();
+
+ HoodieTable table = HoodieSparkTable.create(metadataTableWriteConfig,
context, metadataMetaClient);
+ table.getHoodieView().sync();
+ List<FileSlice> fileSlices =
table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList());
+ HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
+ HoodieHFileReader hoodieHFileReader = new
HoodieHFileReader(context.getHadoopConf().get(), new Path(baseFile.getPath()),
+ new CacheConfig(context.getHadoopConf().get()));
+ List<Pair<String, IndexedRecord>> records =
hoodieHFileReader.readAllRecords();
+ records.forEach(entry -> {
+ if (populateMetaFields) {
+ assertNotNull(((GenericRecord)
entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ } else {
+ assertNull(((GenericRecord)
entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ }
+ });
+ }
+
/**
* Test rollback of various table operations sync to Metadata Table
correctly.
*/
@@ -586,6 +653,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
* Tests the metadata payload spurious deletes.
* Lets say a commit was applied to metadata table, and later was explicitly
got rolledback. Due to spark task failures, there could be more files in
rollback
* metadata when compared to the original commit metadata. When payload
consistency check is enabled, it will throw exception. If not, it will succeed.
+ *
* @throws Exception
*/
@ParameterizedTest
@@ -1308,6 +1376,95 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
}
+ /**
+ * Fetching WriteConfig for metadata table from Data table's writeConfig is
not trivial and the method is not public in source code. so, for now,
+ * using this method which mimics source code.
+ * @param writeConfig
+ * @return
+ */
+ private HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig
writeConfig) {
+ int parallelism = writeConfig.getMetadataInsertParallelism();
+
+ int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(),
writeConfig.getMinCommitsToKeep());
+ int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(),
writeConfig.getMaxCommitsToKeep());
+
+ // Create the write config for the metadata table by borrowing options
from the main write config.
+ HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+ .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+ .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
+
.withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled())
+
.withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs())
+
.withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs())
+
.withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
+ .build())
+ .withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
+ .withAutoCommit(true)
+ .withAvroSchemaValidate(true)
+ .withEmbeddedTimelineServerEnabled(false)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withRollbackUsingMarkers(false)
+
.withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
+ .withSchema(HoodieMetadataRecord.getClassSchema().toString())
+ .forTable(writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withAsyncClean(writeConfig.isMetadataAsyncClean())
+ // we will trigger cleaning manually, to control the instant times
+ .withAutoClean(false)
+ .withCleanerParallelism(parallelism)
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
+ .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
+ // we will trigger compaction manually, to control the instant
times
+ .withInlineCompaction(false)
+
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
+ .withParallelism(parallelism, parallelism)
+ .withDeleteParallelism(parallelism)
+ .withRollbackParallelism(parallelism)
+ .withFinalizeWriteParallelism(parallelism)
+ .withAllowMultiWriteOnSameInstant(true)
+
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
+
.withPopulateMetaFields(writeConfig.getMetadataConfig().populateMetaFields());
+
+ // RecordKey properties are needed for the metadata table records
+ final Properties properties = new Properties();
+ properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(),
HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY);
+ properties.put("hoodie.datasource.write.recordkey.field",
HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY);
+ builder.withProperties(properties);
+
+ if (writeConfig.isMetricsOn()) {
+ builder.withMetricsConfig(HoodieMetricsConfig.newBuilder()
+ .withReporterType(writeConfig.getMetricsReporterType().toString())
+ .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled())
+ .on(true).build());
+ switch (writeConfig.getMetricsReporterType()) {
+ case GRAPHITE:
+
builder.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder()
+ .onGraphitePort(writeConfig.getGraphiteServerPort())
+ .toGraphiteHost(writeConfig.getGraphiteServerHost())
+ .usePrefix(writeConfig.getGraphiteMetricPrefix()).build());
+ break;
+ case JMX:
+ builder.withMetricsJmxConfig(HoodieMetricsJmxConfig.newBuilder()
+ .onJmxPort(writeConfig.getJmxPort())
+ .toJmxHost(writeConfig.getJmxHost())
+ .build());
+ break;
+ case DATADOG:
+ case PROMETHEUS:
+ case PROMETHEUS_PUSHGATEWAY:
+ case CONSOLE:
+ case INMEMORY:
+ case CLOUDWATCH:
+ break;
+ default:
+ throw new HoodieMetadataException("Unsupported Metrics Reporter type
" + writeConfig.getMetricsReporterType());
+ }
+ }
+ return builder.build();
+ }
+
private void doPreBootstrapOperations(HoodieTestTable testTable) throws
Exception {
doPreBootstrapOperations(testTable, "0000001", "0000002");
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index 0a45d31..25dfd29 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -292,7 +292,7 @@ public class TestHoodieMetadataBase extends
HoodieClientTestHarness {
.enable(useFileListingMetadata)
.enableFullScan(enableFullScan)
.enableMetrics(enableMetrics)
- .withPopulateMetaFields(false)
+
.withPopulateMetaFields(HoodieMetadataConfig.POPULATE_META_FIELDS.defaultValue())
.ignoreSpuriousDeletes(validateMetadataPayloadConsistency)
.build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 810e475..51791c9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -118,23 +118,15 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Parallelism to use, when listing the table on lake
storage.");
- public static final ConfigProperty<Boolean> ENABLE_INLINE_READING =
ConfigProperty
- .key(METADATA_PREFIX + ".enable.inline.reading")
- .defaultValue(true)
- .sinceVersion("0.10.0")
- .withDocumentation("Enable inline reading of Log files. By default log
block contents are read as byte[] using regular input stream and records "
- + "are deserialized from it. Enabling this will read each log block
as an inline file and read records from the same. For instance, "
- + "for HFileDataBlock, a inline file will be read using
HFileReader.");
-
public static final ConfigProperty<Boolean> ENABLE_FULL_SCAN_LOG_FILES =
ConfigProperty
.key(METADATA_PREFIX + ".enable.full.scan.log.files")
.defaultValue(true)
.sinceVersion("0.10.0")
.withDocumentation("Enable full scanning of log files while reading log
records. If disabled, hudi does look up of only interested entries.");
- public static final ConfigProperty<String> POPULATE_META_FIELDS =
ConfigProperty
+ public static final ConfigProperty<Boolean> POPULATE_META_FIELDS =
ConfigProperty
.key(METADATA_PREFIX + ".populate.meta.fields")
- .defaultValue("false")
+ .defaultValue(true)
.sinceVersion("0.10.0")
.withDocumentation("When enabled, populates all meta fields. When
disabled, no meta fields are populated.");
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
index 2c9ca39..e635eea 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
@@ -28,6 +28,8 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.log4j.LogManager;
@@ -142,7 +144,7 @@ public class HoodieMetadataMergedLogRecordReader extends
HoodieMergedLogRecordSc
*/
public static class Builder extends HoodieMergedLogRecordScanner.Builder {
private Set<String> mergeKeyFilter = Collections.emptySet();
- private boolean enableFullScan;
+ private boolean enableFullScan =
HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue();
private boolean enableInlineReading;
@Override
diff --git
a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip
b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip
index 42c0301..299b070 100644
Binary files
a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip
and
b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip
differ
diff --git
a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip
b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip
index 8e57b1d..d80439d 100644
Binary files
a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip
and
b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip
differ