This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch release-0.10.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 7de5d1760191128c86ef40ac2f9cb57e87328e3e Author: Sivabalan Narayanan <[email protected]> AuthorDate: Fri Dec 3 07:20:21 2021 -0500 [HUDI-2902] Fixing populate meta fields with Hfile writers and Disabling virtual keys by default for metadata table (#4194) (cherry picked from commit e483f7c776ed53015d08b78fc33729892dc2143c) --- .../apache/hudi/io/HoodieSortedMergeHandle.java | 3 +- .../hudi/io/storage/HoodieFileWriterFactory.java | 2 +- .../apache/hudi/io/storage/HoodieHFileWriter.java | 14 +- .../io/storage/TestHoodieHFileReaderWriter.java | 55 ++++++-- .../resources/exampleSchemaWithMetaFields.avsc | 66 +++++++++ .../functional/TestHoodieBackedMetadata.java | 157 +++++++++++++++++++++ .../client/functional/TestHoodieMetadataBase.java | 2 +- .../hudi/common/config/HoodieMetadataConfig.java | 12 +- .../HoodieMetadataMergedLogRecordReader.java | 4 +- ...inuousModeWithMultipleWriters.COPY_ON_WRITE.zip | Bin 2592485 -> 2592484 bytes ...inuousModeWithMultipleWriters.MERGE_ON_READ.zip | Bin 3015940 -> 3015939 bytes 11 files changed, 287 insertions(+), 28 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java index 606e63a..533611d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.HoodieTable; import org.apache.avro.generic.GenericRecord; @@ -72,7 +73,7 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext */ @Override public void write(GenericRecord oldRecord) { - String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt); // To maintain overall sorted order across updates and inserts, write any new inserts whose keys are less than // the oldRecord's key. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index c6d4540..0b6afd4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -89,7 +89,7 @@ public class HoodieFileWriterFactory { config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(), PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR); - return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier); + return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier, config.populateMetaFields()); } private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newOrcFileWriter( diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java index d7e01c5..a719bcb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java @@ -62,6 +62,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR private final long maxFileSize; private final String instantTime; private final TaskContextSupplier taskContextSupplier; + private final boolean populateMetaFields; private HFile.Writer writer; private String minRecordKey; private String maxRecordKey; @@ -70,7 +71,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR private static String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction"; public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileConfig, Schema schema, - TaskContextSupplier taskContextSupplier) throws IOException { + TaskContextSupplier taskContextSupplier, boolean populateMetaFields) throws IOException { Configuration conf = FSUtils.registerFileSystem(file, hfileConfig.getHadoopConf()); this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf); @@ -84,6 +85,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR this.maxFileSize = hfileConfig.getMaxFileSize(); this.instantTime = instantTime; this.taskContextSupplier = taskContextSupplier; + this.populateMetaFields = populateMetaFields; HFileContext context = new HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize()) .withCompression(hfileConfig.getCompressionAlgorithm()) @@ -104,9 +106,13 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR @Override public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException { - prepRecordWithMetadata(avroRecord, record, instantTime, - taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName()); - writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord); + if (populateMetaFields) { + prepRecordWithMetadata(avroRecord, record, instantTime, + taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName()); + writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord); + } else { + writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord); + } } @Override diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java index c087607..86a0886 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java @@ -22,6 +22,9 @@ import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -34,19 +37,24 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.util.Pair; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mockito; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Stream; import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; @@ -55,6 +63,9 @@ import static org.apache.hudi.io.storage.HoodieHFileConfig.DROP_BEHIND_CACHE_COM import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR; import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.when; public class TestHoodieHFileReaderWriter { @TempDir File tempDir; @@ -73,21 +84,34 @@ public class TestHoodieHFileReaderWriter { } } - private HoodieHFileWriter createHFileWriter(Schema avroSchema) throws Exception { + private static Stream<Arguments> populateMetaFieldsAndTestAvroWithMeta() { + return Arrays.stream(new Boolean[][] { + {true, true}, + {false, true}, + {true, false}, + {false, false} + }).map(Arguments::of); + } + + private HoodieHFileWriter createHFileWriter(Schema avroSchema, boolean populateMetaFields) throws Exception { BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name()); Configuration conf = new Configuration(); TaskContextSupplier mockTaskContextSupplier = Mockito.mock(TaskContextSupplier.class); + Supplier<Integer> partitionSupplier = Mockito.mock(Supplier.class); + when(mockTaskContextSupplier.getPartitionIdSupplier()).thenReturn(partitionSupplier); + when(partitionSupplier.get()).thenReturn(10); String instantTime = "000"; HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR); - return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier); + return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier, populateMetaFields); } - @Test - public void testWriteReadHFile() throws Exception { - Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc"); - HoodieHFileWriter writer = createHFileWriter(avroSchema); + @ParameterizedTest + @MethodSource("populateMetaFieldsAndTestAvroWithMeta") + public void testWriteReadHFile(boolean populateMetaFields, boolean testAvroWithMeta) throws Exception { + Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithMetaFields.avsc"); + HoodieHFileWriter writer = createHFileWriter(avroSchema, populateMetaFields); List<String> keys = new ArrayList<>(); Map<String, GenericRecord> recordMap = new HashMap<>(); for (int i = 0; i < 100; i++) { @@ -97,7 +121,13 @@ public class TestHoodieHFileReaderWriter { keys.add(key); record.put("time", Integer.toString(RANDOM.nextInt())); record.put("number", i); - writer.writeAvro(key, record); + if (testAvroWithMeta) { + writer.writeAvroWithMetadata(record, new HoodieRecord(new HoodieKey((String) record.get("_row_key"), + Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload())); // payload does not matter. GenericRecord passed in is what matters + // only HoodieKey will be looked up from the 2nd arg(HoodieRecord). + } else { + writer.writeAvro(key, record); + } recordMap.put(key, record); } writer.close(); @@ -109,8 +139,8 @@ public class TestHoodieHFileReaderWriter { records.forEach(entry -> assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()))); hoodieHFileReader.close(); - for (int i = 0; i < 20; i++) { - int randomRowstoFetch = 5 + RANDOM.nextInt(50); + for (int i = 0; i < 2; i++) { + int randomRowstoFetch = 5 + RANDOM.nextInt(10); Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys); List<String> rowsList = new ArrayList<>(rowsToFetch); Collections.sort(rowsList); @@ -119,6 +149,11 @@ public class TestHoodieHFileReaderWriter { assertEquals(result.size(), randomRowstoFetch); result.forEach(entry -> { assertEquals(entry.getSecond(), recordMap.get(entry.getFirst())); + if (populateMetaFields && testAvroWithMeta) { + assertNotNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + } else { + assertNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + } }); hoodieHFileReader.close(); } diff --git a/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithMetaFields.avsc b/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithMetaFields.avsc new file mode 100644 index 0000000..c3fa822 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithMetaFields.avsc @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace": "example.schema", + "type": "record", + "name": "trip", + "fields": [ + { + "name": "_hoodie_commit_time", + "type": ["null","string"], + "default":null + }, + { + "name": "_hoodie_commit_seqno", + "type": ["null","string"], + "default":null + }, + { + "name": "_hoodie_record_key", + "type": ["null","string"], + "default":null + }, + { + "name": "_hoodie_partition_path", + "type": ["null","string"], + "default":null + }, + { + "name": "_hoodie_file_name", + "type": ["null","string"], + "default":null + }, + { + "name": "_hoodie_operation", + "type": ["null","string"], + "default":null + }, + { + "name": "_row_key", + "type": "string" + }, + { + "name": "time", + "type": "string" + }, + { + "name": "number", + "type": ["int", "null"] + } + ] +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 73b7811..ed245da 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -18,6 +18,7 @@ package org.apache.hudi.client.functional; +import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -29,6 +30,8 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; @@ -41,6 +44,7 @@ import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -60,12 +64,18 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.config.metrics.HoodieMetricsConfig; +import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; +import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.HoodieMetadataMetrics; +import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; @@ -75,9 +85,13 @@ import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.testutils.MetadataMergeWriteStatus; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Time; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -120,10 +134,12 @@ import static org.apache.hudi.common.model.WriteOperationType.DELETE; import static org.apache.hudi.common.model.WriteOperationType.INSERT; import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -317,6 +333,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { /** * Tests that table services in data table won't trigger table services in metadata table. + * * @throws Exception */ @Test @@ -346,6 +363,56 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001"); } + + /** + * Tests that virtual key configs are honored in base files after compaction in metadata table. + * + * @throws Exception + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testVirtualKeysInBaseFiles(boolean populateMetaFields) throws Exception { + HoodieTableType tableType = MERGE_ON_READ; + init(tableType, false); + writeConfig = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .enableFullScan(true) + .enableMetrics(false) + .withPopulateMetaFields(populateMetaFields) + .withMaxNumDeltaCommitsBeforeCompaction(2) + .build()).build(); + initWriteConfigAndMetatableWriter(writeConfig, true); + + doWriteOperation(testTable, "0000001", INSERT); + doClean(testTable, "0000003", Arrays.asList("0000001")); + // this should have triggered compaction in metadata table + doWriteOperation(testTable, "0000004", UPSERT); + + HoodieTableMetadata tableMetadata = metadata(writeConfig, context); + assertTrue(tableMetadata.getLatestCompactionTime().isPresent()); + assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001"); + + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + HoodieWriteConfig metadataTableWriteConfig = getMetadataWriteConfig(writeConfig); + metadataMetaClient.reloadActiveTimeline(); + + HoodieTable table = HoodieSparkTable.create(metadataTableWriteConfig, context, metadataMetaClient); + table.getHoodieView().sync(); + List<FileSlice> fileSlices = table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList()); + HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); + HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), new Path(baseFile.getPath()), + new CacheConfig(context.getHadoopConf().get())); + List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords(); + records.forEach(entry -> { + if (populateMetaFields) { + assertNotNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + } else { + assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + } + }); + } + /** * Test rollback of various table operations sync to Metadata Table correctly. */ @@ -586,6 +653,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { * Tests the metadata payload spurious deletes. * Lets say a commit was applied to metadata table, and later was explicitly got rolledback. Due to spark task failures, there could be more files in rollback * metadata when compared to the original commit metadata. When payload consistency check is enabled, it will throw exception. If not, it will succeed. + * * @throws Exception */ @ParameterizedTest @@ -1308,6 +1376,95 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } } + /** + * Fetching WriteConfig for metadata table from Data table's writeConfig is not trivial and the method is not public in source code. so, for now, + * using this method which mimics source code. + * @param writeConfig + * @return + */ + private HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig writeConfig) { + int parallelism = writeConfig.getMetadataInsertParallelism(); + + int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep()); + int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(), writeConfig.getMaxCommitsToKeep()); + + // Create the write config for the metadata table by borrowing options from the main write config. + HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder() + .withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled()) + .withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs()) + .withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs()) + .withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks()) + .build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build()) + .withAutoCommit(true) + .withAvroSchemaValidate(true) + .withEmbeddedTimelineServerEnabled(false) + .withMarkersType(MarkerType.DIRECT.name()) + .withRollbackUsingMarkers(false) + .withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath())) + .withSchema(HoodieMetadataRecord.getClassSchema().toString()) + .forTable(writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withAsyncClean(writeConfig.isMetadataAsyncClean()) + // we will trigger cleaning manually, to control the instant times + .withAutoClean(false) + .withCleanerParallelism(parallelism) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .retainCommits(writeConfig.getMetadataCleanerCommitsRetained()) + .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep) + // we will trigger compaction manually, to control the instant times + .withInlineCompaction(false) + .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build()) + .withParallelism(parallelism, parallelism) + .withDeleteParallelism(parallelism) + .withRollbackParallelism(parallelism) + .withFinalizeWriteParallelism(parallelism) + .withAllowMultiWriteOnSameInstant(true) + .withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) + .withPopulateMetaFields(writeConfig.getMetadataConfig().populateMetaFields()); + + // RecordKey properties are needed for the metadata table records + final Properties properties = new Properties(); + properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY); + properties.put("hoodie.datasource.write.recordkey.field", HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY); + builder.withProperties(properties); + + if (writeConfig.isMetricsOn()) { + builder.withMetricsConfig(HoodieMetricsConfig.newBuilder() + .withReporterType(writeConfig.getMetricsReporterType().toString()) + .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled()) + .on(true).build()); + switch (writeConfig.getMetricsReporterType()) { + case GRAPHITE: + builder.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder() + .onGraphitePort(writeConfig.getGraphiteServerPort()) + .toGraphiteHost(writeConfig.getGraphiteServerHost()) + .usePrefix(writeConfig.getGraphiteMetricPrefix()).build()); + break; + case JMX: + builder.withMetricsJmxConfig(HoodieMetricsJmxConfig.newBuilder() + .onJmxPort(writeConfig.getJmxPort()) + .toJmxHost(writeConfig.getJmxHost()) + .build()); + break; + case DATADOG: + case PROMETHEUS: + case PROMETHEUS_PUSHGATEWAY: + case CONSOLE: + case INMEMORY: + case CLOUDWATCH: + break; + default: + throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType()); + } + } + return builder.build(); + } + private void doPreBootstrapOperations(HoodieTestTable testTable) throws Exception { doPreBootstrapOperations(testTable, "0000001", "0000002"); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 0a45d31..25dfd29 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -292,7 +292,7 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness { .enable(useFileListingMetadata) .enableFullScan(enableFullScan) .enableMetrics(enableMetrics) - .withPopulateMetaFields(false) + .withPopulateMetaFields(HoodieMetadataConfig.POPULATE_META_FIELDS.defaultValue()) .ignoreSpuriousDeletes(validateMetadataPayloadConsistency) .build()) .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 810e475..51791c9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -118,23 +118,15 @@ public final class HoodieMetadataConfig extends HoodieConfig { .sinceVersion("0.7.0") .withDocumentation("Parallelism to use, when listing the table on lake storage."); - public static final ConfigProperty<Boolean> ENABLE_INLINE_READING = ConfigProperty - .key(METADATA_PREFIX + ".enable.inline.reading") - .defaultValue(true) - .sinceVersion("0.10.0") - .withDocumentation("Enable inline reading of Log files. By default log block contents are read as byte[] using regular input stream and records " - + "are deserialized from it. Enabling this will read each log block as an inline file and read records from the same. For instance, " - + "for HFileDataBlock, a inline file will be read using HFileReader."); - public static final ConfigProperty<Boolean> ENABLE_FULL_SCAN_LOG_FILES = ConfigProperty .key(METADATA_PREFIX + ".enable.full.scan.log.files") .defaultValue(true) .sinceVersion("0.10.0") .withDocumentation("Enable full scanning of log files while reading log records. If disabled, hudi does look up of only interested entries."); - public static final ConfigProperty<String> POPULATE_META_FIELDS = ConfigProperty + public static final ConfigProperty<Boolean> POPULATE_META_FIELDS = ConfigProperty .key(METADATA_PREFIX + ".populate.meta.fields") - .defaultValue("false") + .defaultValue(true) .sinceVersion("0.10.0") .withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated."); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java index 2c9ca39..e635eea 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java @@ -28,6 +28,8 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; + +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.log4j.LogManager; @@ -142,7 +144,7 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc */ public static class Builder extends HoodieMergedLogRecordScanner.Builder { private Set<String> mergeKeyFilter = Collections.emptySet(); - private boolean enableFullScan; + private boolean enableFullScan = HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue(); private boolean enableInlineReading; @Override diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip index 42c0301..299b070 100644 Binary files a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip and b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip differ diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip index 8e57b1d..d80439d 100644 Binary files a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip and b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip differ
