nsivabalan commented on code in PR #13295: URL: https://github.com/apache/hudi/pull/13295#discussion_r2112791251
########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.io.storage.HoodieSeekingFileReader; +import org.apache.hudi.metadata.HoodieMetadataLogRecordReader; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieMetadataWriteUtils; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; +import static org.apache.hudi.common.util.CollectionUtils.toStream; +import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER; +import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase { + + private Random random; + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + random = new Random(0xDEED); + } + + @Test + public void testWritesViaMetadataWriteClient() throws Exception { + + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true) + .withRecordIndexFileGroupCount(1, 1).build()).build(); + + // trigger end to end write to data table so that metadata table is also initialized. + initDataTableWithACommit(hoodieWriteConfig); + + // fetch metadata file slice info + HoodieWriteConfig mdtWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, HoodieFailedWritesCleaningPolicy.EAGER); + Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>(); + List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new ArrayList<>(); + List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>(); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build(); + fetchMetadataFileSliceInfo(metadataMetaClient, filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, mdtPartitionsFileIdMapping); + + List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>(); + Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new HashMap<>(); + List<String> expectedAllPartitions = new ArrayList<>(); + List<HoodieRecord> rliRecords = new ArrayList<>(); + Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>(); + String commitTimeOfInterest = null; + + // create Write client to SparkRDDMetadataWriteClient and trigger writes. + try (SparkRDDMetadataWriteClient client = new SparkRDDMetadataWriteClient(context, mdtWriteConfig)) { + commitTimeOfInterest = client.createNewInstantTime(); + + // prepare FILES partition records. + prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0), + commitTimeOfInterest, filesPartitionExpectedRecords, filesPartitionExpectedRecordsMap, expectedAllPartitions); + + // prepare RLI records. + prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap); + + // ingest RLI records to metadata table. + client.startCommitForMetadataTable(metadataMetaClient, commitTimeOfInterest, DELTA_COMMIT_ACTION); + JavaRDD<WriteStatus> partialWriteStatusesRDD = client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, Option.of(nonFilesPartitionFileGroupIdList)); + List<WriteStatus> partialWriteStatuses = partialWriteStatusesRDD.collect(); + + // validate that the commit is still pending since we are streaming write to metadata table. + HoodieActiveTimeline reloadedMdtActiveTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(), 3); // files, rli instantiaton and 1 write to data table. + String finalCommitTimeOfInterest = commitTimeOfInterest; + assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant -> instant.requestedTime().equals(finalCommitTimeOfInterest))); Review Comment: I mean, when hudi tries to create another inflight file for an existing commit, it will fail by default. "_.hoodie.allow.multi.write.on.same.instant" dictates this. and the default value is false for this config. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMetadataTableUpsertCommitActionExecutor.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.WorkloadStat; +import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor; + +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.storage.StorageLevel; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Upsert commit action executor for Metadata table. + * + * @param <T> + */ +public class SparkMetadataTableUpsertCommitActionExecutor<T> extends SparkUpsertPreppedDeltaCommitActionExecutor<T> { + + private static final WorkloadStat PLACEHOLDER_GLOBAL_STAT = new WorkloadStat(); + private final List<HoodieFileGroupId> mdtFileGroupIdList; + private final boolean initialCall; + + public SparkMetadataTableUpsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, + HoodieData<HoodieRecord<T>> preppedRecords, List<HoodieFileGroupId> mdtFileGroupIdList, + boolean initialCall) { + super(context, config, table, instantTime, preppedRecords); + this.mdtFileGroupIdList = mdtFileGroupIdList; + this.initialCall = initialCall; + } + + @Override + protected boolean shouldPersistInputRecords(JavaRDD<HoodieRecord<T>> inputRDD) { + return inputRDD.getStorageLevel() == StorageLevel.NONE(); + } + + @Override + protected WorkloadProfile prepareWorkloadProfile(HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate) { + // create workload profile only when we are writing to FILES partition in Metadata table. + WorkloadProfile workloadProfile = new WorkloadProfile(Pair.of(Collections.emptyMap(), PLACEHOLDER_GLOBAL_STAT)); + return workloadProfile; + } + + protected void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime) + throws HoodieCommitException { + // with streaming writes support, we might write to metadata table multiple times for the same instant times. + // ie. writeClient.startCommit(t1), writeClient.upsert(batch1, t1), writeClient.upsert(batch2, t1), writeClient.commit(t1, ...) + // So, here we are generating inflight file only in the last known writes, which we know will only have FILES partition. + if (!initialCall) { Review Comment: does not really matter. my initial intention was to create the inflight just when adding to FILES partition. but logically speaking, we should add inflight when we are writing first batch of records to mdt. So, I can flip this. ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.io.storage.HoodieSeekingFileReader; +import org.apache.hudi.metadata.HoodieMetadataLogRecordReader; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieMetadataWriteUtils; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; +import static org.apache.hudi.common.util.CollectionUtils.toStream; +import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER; +import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase { + + private Random random; + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + random = new Random(0xDEED); + } + + @Test + public void testWritesViaMetadataWriteClient() throws Exception { + + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true) + .withRecordIndexFileGroupCount(1, 1).build()).build(); + + // trigger end to end write to data table so that metadata table is also initialized. + initDataTableWithACommit(hoodieWriteConfig); + + // fetch metadata file slice info + HoodieWriteConfig mdtWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, HoodieFailedWritesCleaningPolicy.EAGER); + Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>(); + List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new ArrayList<>(); + List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>(); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build(); + fetchMetadataFileSliceInfo(metadataMetaClient, filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, mdtPartitionsFileIdMapping); + + List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>(); + Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new HashMap<>(); + List<String> expectedAllPartitions = new ArrayList<>(); + List<HoodieRecord> rliRecords = new ArrayList<>(); + Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>(); + String commitTimeOfInterest = null; + + // create Write client to SparkRDDMetadataWriteClient and trigger writes. + try (SparkRDDMetadataWriteClient client = new SparkRDDMetadataWriteClient(context, mdtWriteConfig)) { + commitTimeOfInterest = client.createNewInstantTime(); + + // prepare FILES partition records. + prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0), + commitTimeOfInterest, filesPartitionExpectedRecords, filesPartitionExpectedRecordsMap, expectedAllPartitions); + + // prepare RLI records. + prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap); + + // ingest RLI records to metadata table. + client.startCommitForMetadataTable(metadataMetaClient, commitTimeOfInterest, DELTA_COMMIT_ACTION); + JavaRDD<WriteStatus> partialWriteStatusesRDD = client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, Option.of(nonFilesPartitionFileGroupIdList)); + List<WriteStatus> partialWriteStatuses = partialWriteStatusesRDD.collect(); + + // validate that the commit is still pending since we are streaming write to metadata table. + HoodieActiveTimeline reloadedMdtActiveTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(), 3); // files, rli instantiaton and 1 write to data table. + String finalCommitTimeOfInterest = commitTimeOfInterest; + assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant -> instant.requestedTime().equals(finalCommitTimeOfInterest))); + + // write to FILES partition + JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList)); + List<WriteStatus> filesPartitionWriteStatus = filePartitionWriteStatusesRDD.collect(); + List<HoodieWriteStat> allWriteStats = new ArrayList<>(); + allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList())); + allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList())); + client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), DELTA_COMMIT_ACTION); + } + + // validate + readFromMetadataTableAndValidateRecords(metadataMetaClient, hoodieWriteConfig, filesPartitionExpectedRecordsMap, rliPartitionExpectedRecordsMap, commitTimeOfInterest); + } + + private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) throws Exception { + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + final int numRecords = 100; + String newCommitTime = client.createNewInstantTime(); + insertBatch(hoodieWriteConfig, client, newCommitTime, HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert, + false, true, numRecords, numRecords, 1, Option.empty(), INSTANT_GENERATOR); + } + } + + private void fetchMetadataFileSliceInfo(HoodieTableMetaClient metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList, + List<HoodieFileGroupId> nonFilesPartitionsFileGroupIdList, Map<String, List<String>> mdtPartitionsFileIdMapping) { + try (HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) { + List<FileSlice> fileSlices = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath()); + mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), fileSlices.stream().map(fileSlice -> fileSlice.getFileId()).collect(Collectors.toList())); + fileSlices.stream().forEach(fileSlice -> { Review Comment: sure. ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.io.storage.HoodieSeekingFileReader; +import org.apache.hudi.metadata.HoodieMetadataLogRecordReader; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieMetadataWriteUtils; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; +import static org.apache.hudi.common.util.CollectionUtils.toStream; +import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER; +import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase { + + private Random random; + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + random = new Random(0xDEED); + } + + @Test + public void testWritesViaMetadataWriteClient() throws Exception { + + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true) + .withRecordIndexFileGroupCount(1, 1).build()).build(); + + // trigger end to end write to data table so that metadata table is also initialized. + initDataTableWithACommit(hoodieWriteConfig); + + // fetch metadata file slice info + HoodieWriteConfig mdtWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, HoodieFailedWritesCleaningPolicy.EAGER); + Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>(); + List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new ArrayList<>(); + List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>(); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build(); + fetchMetadataFileSliceInfo(metadataMetaClient, filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, mdtPartitionsFileIdMapping); + + List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>(); + Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new HashMap<>(); + List<String> expectedAllPartitions = new ArrayList<>(); + List<HoodieRecord> rliRecords = new ArrayList<>(); + Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>(); + String commitTimeOfInterest = null; + + // create Write client to SparkRDDMetadataWriteClient and trigger writes. + try (SparkRDDMetadataWriteClient client = new SparkRDDMetadataWriteClient(context, mdtWriteConfig)) { + commitTimeOfInterest = client.createNewInstantTime(); + + // prepare FILES partition records. + prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0), + commitTimeOfInterest, filesPartitionExpectedRecords, filesPartitionExpectedRecordsMap, expectedAllPartitions); + + // prepare RLI records. + prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap); + + // ingest RLI records to metadata table. + client.startCommitForMetadataTable(metadataMetaClient, commitTimeOfInterest, DELTA_COMMIT_ACTION); + JavaRDD<WriteStatus> partialWriteStatusesRDD = client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, Option.of(nonFilesPartitionFileGroupIdList)); + List<WriteStatus> partialWriteStatuses = partialWriteStatusesRDD.collect(); + + // validate that the commit is still pending since we are streaming write to metadata table. + HoodieActiveTimeline reloadedMdtActiveTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(), 3); // files, rli instantiaton and 1 write to data table. + String finalCommitTimeOfInterest = commitTimeOfInterest; + assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant -> instant.requestedTime().equals(finalCommitTimeOfInterest))); + + // write to FILES partition + JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList)); + List<WriteStatus> filesPartitionWriteStatus = filePartitionWriteStatusesRDD.collect(); + List<HoodieWriteStat> allWriteStats = new ArrayList<>(); + allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList())); + allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList())); + client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), DELTA_COMMIT_ACTION); + } + + // validate + readFromMetadataTableAndValidateRecords(metadataMetaClient, hoodieWriteConfig, filesPartitionExpectedRecordsMap, rliPartitionExpectedRecordsMap, commitTimeOfInterest); + } + + private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) throws Exception { + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + final int numRecords = 100; + String newCommitTime = client.createNewInstantTime(); + insertBatch(hoodieWriteConfig, client, newCommitTime, HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert, + false, true, numRecords, numRecords, 1, Option.empty(), INSTANT_GENERATOR); + } + } + + private void fetchMetadataFileSliceInfo(HoodieTableMetaClient metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList, + List<HoodieFileGroupId> nonFilesPartitionsFileGroupIdList, Map<String, List<String>> mdtPartitionsFileIdMapping) { + try (HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) { + List<FileSlice> fileSlices = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath()); + mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), fileSlices.stream().map(fileSlice -> fileSlice.getFileId()).collect(Collectors.toList())); + fileSlices.stream().forEach(fileSlice -> { + filesPartitionFileGroupIdList.add(new HoodieFileGroupId(MetadataPartitionType.FILES.getPartitionPath(), fileSlice.getFileId())); + }); + + fileSlices = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.RECORD_INDEX.getPartitionPath()); + mdtPartitionsFileIdMapping.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), fileSlices.stream().map(fileSlice -> fileSlice.getFileId()).collect(Collectors.toList())); + fileSlices.stream().forEach(fileSlice -> { + nonFilesPartitionsFileGroupIdList.add(new HoodieFileGroupId(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), fileSlice.getFileId())); + }); + } + } + + private void readFromMetadataTableAndValidateRecords(HoodieTableMetaClient metadataMetaClient, HoodieWriteConfig hoodieWriteConfig, + Map<String, HoodieRecord> filesPartitionExpectedRecordsMap, + Map<String, HoodieRecord> rliPartitionExpectedRecordsMap, + String validMetadataInstant) throws IOException { + // read from MDT and validate all records. + metaClient = HoodieTableMetaClient.reload(metaClient); + metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient); + try (HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) { + FileSlice filesFileSlice = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath()).get(0); Review Comment: ack ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.io.storage.HoodieSeekingFileReader; +import org.apache.hudi.metadata.HoodieMetadataLogRecordReader; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieMetadataWriteUtils; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; +import static org.apache.hudi.common.util.CollectionUtils.toStream; +import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER; +import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase { + + private Random random; + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + random = new Random(0xDEED); + } + + @Test + public void testWritesViaMetadataWriteClient() throws Exception { + + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true) + .withRecordIndexFileGroupCount(1, 1).build()).build(); + + // trigger end to end write to data table so that metadata table is also initialized. + initDataTableWithACommit(hoodieWriteConfig); + + // fetch metadata file slice info + HoodieWriteConfig mdtWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, HoodieFailedWritesCleaningPolicy.EAGER); + Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>(); + List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new ArrayList<>(); + List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>(); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build(); + fetchMetadataFileSliceInfo(metadataMetaClient, filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, mdtPartitionsFileIdMapping); + + List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>(); + Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new HashMap<>(); + List<String> expectedAllPartitions = new ArrayList<>(); + List<HoodieRecord> rliRecords = new ArrayList<>(); + Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>(); + String commitTimeOfInterest = null; + + // create Write client to SparkRDDMetadataWriteClient and trigger writes. + try (SparkRDDMetadataWriteClient client = new SparkRDDMetadataWriteClient(context, mdtWriteConfig)) { + commitTimeOfInterest = client.createNewInstantTime(); + + // prepare FILES partition records. + prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0), + commitTimeOfInterest, filesPartitionExpectedRecords, filesPartitionExpectedRecordsMap, expectedAllPartitions); + + // prepare RLI records. + prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap); + + // ingest RLI records to metadata table. + client.startCommitForMetadataTable(metadataMetaClient, commitTimeOfInterest, DELTA_COMMIT_ACTION); + JavaRDD<WriteStatus> partialWriteStatusesRDD = client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, Option.of(nonFilesPartitionFileGroupIdList)); + List<WriteStatus> partialWriteStatuses = partialWriteStatusesRDD.collect(); + + // validate that the commit is still pending since we are streaming write to metadata table. + HoodieActiveTimeline reloadedMdtActiveTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(), 3); // files, rli instantiaton and 1 write to data table. + String finalCommitTimeOfInterest = commitTimeOfInterest; + assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant -> instant.requestedTime().equals(finalCommitTimeOfInterest))); + + // write to FILES partition + JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList)); + List<WriteStatus> filesPartitionWriteStatus = filePartitionWriteStatusesRDD.collect(); + List<HoodieWriteStat> allWriteStats = new ArrayList<>(); + allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList())); + allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList())); + client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), DELTA_COMMIT_ACTION); + } + + // validate + readFromMetadataTableAndValidateRecords(metadataMetaClient, hoodieWriteConfig, filesPartitionExpectedRecordsMap, rliPartitionExpectedRecordsMap, commitTimeOfInterest); + } + + private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) throws Exception { + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + final int numRecords = 100; + String newCommitTime = client.createNewInstantTime(); + insertBatch(hoodieWriteConfig, client, newCommitTime, HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert, + false, true, numRecords, numRecords, 1, Option.empty(), INSTANT_GENERATOR); + } + } + + private void fetchMetadataFileSliceInfo(HoodieTableMetaClient metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList, + List<HoodieFileGroupId> nonFilesPartitionsFileGroupIdList, Map<String, List<String>> mdtPartitionsFileIdMapping) { + try (HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) { + List<FileSlice> fileSlices = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath()); + mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), fileSlices.stream().map(fileSlice -> fileSlice.getFileId()).collect(Collectors.toList())); + fileSlices.stream().forEach(fileSlice -> { + filesPartitionFileGroupIdList.add(new HoodieFileGroupId(MetadataPartitionType.FILES.getPartitionPath(), fileSlice.getFileId())); + }); + + fileSlices = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.RECORD_INDEX.getPartitionPath()); + mdtPartitionsFileIdMapping.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), fileSlices.stream().map(fileSlice -> fileSlice.getFileId()).collect(Collectors.toList())); + fileSlices.stream().forEach(fileSlice -> { Review Comment: sure. ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.io.storage.HoodieSeekingFileReader; +import org.apache.hudi.metadata.HoodieMetadataLogRecordReader; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieMetadataWriteUtils; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; +import static org.apache.hudi.common.util.CollectionUtils.toStream; +import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER; +import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase { + + private Random random; + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + random = new Random(0xDEED); + } + + @Test + public void testWritesViaMetadataWriteClient() throws Exception { + + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true) + .withRecordIndexFileGroupCount(1, 1).build()).build(); + + // trigger end to end write to data table so that metadata table is also initialized. + initDataTableWithACommit(hoodieWriteConfig); + + // fetch metadata file slice info + HoodieWriteConfig mdtWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, HoodieFailedWritesCleaningPolicy.EAGER); + Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>(); + List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new ArrayList<>(); + List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>(); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build(); + fetchMetadataFileSliceInfo(metadataMetaClient, filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, mdtPartitionsFileIdMapping); + + List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>(); + Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new HashMap<>(); + List<String> expectedAllPartitions = new ArrayList<>(); + List<HoodieRecord> rliRecords = new ArrayList<>(); + Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>(); + String commitTimeOfInterest = null; + + // create Write client to SparkRDDMetadataWriteClient and trigger writes. + try (SparkRDDMetadataWriteClient client = new SparkRDDMetadataWriteClient(context, mdtWriteConfig)) { + commitTimeOfInterest = client.createNewInstantTime(); + + // prepare FILES partition records. + prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0), + commitTimeOfInterest, filesPartitionExpectedRecords, filesPartitionExpectedRecordsMap, expectedAllPartitions); + + // prepare RLI records. + prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap); + + // ingest RLI records to metadata table. + client.startCommitForMetadataTable(metadataMetaClient, commitTimeOfInterest, DELTA_COMMIT_ACTION); + JavaRDD<WriteStatus> partialWriteStatusesRDD = client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, Option.of(nonFilesPartitionFileGroupIdList)); + List<WriteStatus> partialWriteStatuses = partialWriteStatusesRDD.collect(); + + // validate that the commit is still pending since we are streaming write to metadata table. + HoodieActiveTimeline reloadedMdtActiveTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(), 3); // files, rli instantiaton and 1 write to data table. + String finalCommitTimeOfInterest = commitTimeOfInterest; + assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant -> instant.requestedTime().equals(finalCommitTimeOfInterest))); + + // write to FILES partition + JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList)); + List<WriteStatus> filesPartitionWriteStatus = filePartitionWriteStatusesRDD.collect(); + List<HoodieWriteStat> allWriteStats = new ArrayList<>(); + allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList())); + allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList())); + client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), DELTA_COMMIT_ACTION); + } + + // validate + readFromMetadataTableAndValidateRecords(metadataMetaClient, hoodieWriteConfig, filesPartitionExpectedRecordsMap, rliPartitionExpectedRecordsMap, commitTimeOfInterest); + } + + private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) throws Exception { + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + final int numRecords = 100; + String newCommitTime = client.createNewInstantTime(); + insertBatch(hoodieWriteConfig, client, newCommitTime, HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert, + false, true, numRecords, numRecords, 1, Option.empty(), INSTANT_GENERATOR); + } + } + + private void fetchMetadataFileSliceInfo(HoodieTableMetaClient metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList, Review Comment: I had to return 2 to 3 lists or map and took this route. ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.io.storage.HoodieSeekingFileReader; +import org.apache.hudi.metadata.HoodieMetadataLogRecordReader; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieMetadataWriteUtils; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; +import static org.apache.hudi.common.util.CollectionUtils.toStream; +import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER; +import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase { + + private Random random; + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + random = new Random(0xDEED); + } + + @Test + public void testWritesViaMetadataWriteClient() throws Exception { + + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true) + .withRecordIndexFileGroupCount(1, 1).build()).build(); + + // trigger end to end write to data table so that metadata table is also initialized. + initDataTableWithACommit(hoodieWriteConfig); + + // fetch metadata file slice info + HoodieWriteConfig mdtWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, HoodieFailedWritesCleaningPolicy.EAGER); + Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>(); + List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new ArrayList<>(); + List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>(); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build(); + fetchMetadataFileSliceInfo(metadataMetaClient, filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, mdtPartitionsFileIdMapping); + + List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>(); + Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new HashMap<>(); + List<String> expectedAllPartitions = new ArrayList<>(); + List<HoodieRecord> rliRecords = new ArrayList<>(); + Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>(); + String commitTimeOfInterest = null; + + // create Write client to SparkRDDMetadataWriteClient and trigger writes. + try (SparkRDDMetadataWriteClient client = new SparkRDDMetadataWriteClient(context, mdtWriteConfig)) { + commitTimeOfInterest = client.createNewInstantTime(); + + // prepare FILES partition records. + prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0), + commitTimeOfInterest, filesPartitionExpectedRecords, filesPartitionExpectedRecordsMap, expectedAllPartitions); + + // prepare RLI records. + prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap); + + // ingest RLI records to metadata table. + client.startCommitForMetadataTable(metadataMetaClient, commitTimeOfInterest, DELTA_COMMIT_ACTION); + JavaRDD<WriteStatus> partialWriteStatusesRDD = client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, Option.of(nonFilesPartitionFileGroupIdList)); + List<WriteStatus> partialWriteStatuses = partialWriteStatusesRDD.collect(); + + // validate that the commit is still pending since we are streaming write to metadata table. + HoodieActiveTimeline reloadedMdtActiveTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(), 3); // files, rli instantiaton and 1 write to data table. + String finalCommitTimeOfInterest = commitTimeOfInterest; + assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant -> instant.requestedTime().equals(finalCommitTimeOfInterest))); + + // write to FILES partition + JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList)); + List<WriteStatus> filesPartitionWriteStatus = filePartitionWriteStatusesRDD.collect(); + List<HoodieWriteStat> allWriteStats = new ArrayList<>(); + allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList())); + allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList())); + client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), DELTA_COMMIT_ACTION); + } + + // validate + readFromMetadataTableAndValidateRecords(metadataMetaClient, hoodieWriteConfig, filesPartitionExpectedRecordsMap, rliPartitionExpectedRecordsMap, commitTimeOfInterest); + } + + private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) throws Exception { + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + final int numRecords = 100; + String newCommitTime = client.createNewInstantTime(); + insertBatch(hoodieWriteConfig, client, newCommitTime, HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert, + false, true, numRecords, numRecords, 1, Option.empty(), INSTANT_GENERATOR); + } + } + + private void fetchMetadataFileSliceInfo(HoodieTableMetaClient metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList, + List<HoodieFileGroupId> nonFilesPartitionsFileGroupIdList, Map<String, List<String>> mdtPartitionsFileIdMapping) { + try (HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) { + List<FileSlice> fileSlices = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath()); + mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), fileSlices.stream().map(fileSlice -> fileSlice.getFileId()).collect(Collectors.toList())); + fileSlices.stream().forEach(fileSlice -> { + filesPartitionFileGroupIdList.add(new HoodieFileGroupId(MetadataPartitionType.FILES.getPartitionPath(), fileSlice.getFileId())); + }); + + fileSlices = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.RECORD_INDEX.getPartitionPath()); + mdtPartitionsFileIdMapping.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), fileSlices.stream().map(fileSlice -> fileSlice.getFileId()).collect(Collectors.toList())); + fileSlices.stream().forEach(fileSlice -> { + nonFilesPartitionsFileGroupIdList.add(new HoodieFileGroupId(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), fileSlice.getFileId())); + }); + } + } + + private void readFromMetadataTableAndValidateRecords(HoodieTableMetaClient metadataMetaClient, HoodieWriteConfig hoodieWriteConfig, + Map<String, HoodieRecord> filesPartitionExpectedRecordsMap, + Map<String, HoodieRecord> rliPartitionExpectedRecordsMap, + String validMetadataInstant) throws IOException { + // read from MDT and validate all records. + metaClient = HoodieTableMetaClient.reload(metaClient); + metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient); + try (HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) { + FileSlice filesFileSlice = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath()).get(0); + readFromMDTFileSliceAndValidate(metadataMetaClient, hoodieWriteConfig, filesFileSlice, filesPartitionExpectedRecordsMap, validMetadataInstant); + + FileSlice rliFileSlice = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.RECORD_INDEX.getPartitionPath()).get(0); + readFromMDTFileSliceAndValidate(metadataMetaClient, hoodieWriteConfig, rliFileSlice, rliPartitionExpectedRecordsMap, validMetadataInstant); + } + } + + private void readFromMDTFileSliceAndValidate(HoodieTableMetaClient metadataMetaClient, HoodieWriteConfig hoodieWriteConfig, FileSlice fileSlice, Map<String, HoodieRecord> expectedRecordsMap, + String validMetadataInstant) + throws IOException { + // open readers + Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = openReaders(MetadataPartitionType.FILES.getPartitionPath(), fileSlice, metaClient, metadataMetaClient, hoodieWriteConfig, + validMetadataInstant); + try { + // read from the file slice of interest. + List<String> sortedKeysForFilesPartition = new ArrayList<>(expectedRecordsMap.keySet()); + Collections.sort(sortedKeysForFilesPartition); + + Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords = readLogRecords(readers.getRight(), sortedKeysForFilesPartition); + + Map<String, HoodieRecord<HoodieMetadataPayload>> actualMdtRecordMap = + readFromBaseAndMergeWithLogRecords(readers.getKey(), sortedKeysForFilesPartition, logRecords, fileSlice.getPartitionPath()); + assertEquals(actualMdtRecordMap.size(), expectedRecordsMap.size()); + actualMdtRecordMap.forEach((k, v) -> { Review Comment: ack ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.io.storage.HoodieSeekingFileReader; +import org.apache.hudi.metadata.HoodieMetadataLogRecordReader; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieMetadataWriteUtils; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; +import static org.apache.hudi.common.util.CollectionUtils.toStream; +import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER; +import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestSparkRDDMetadataWriteClient extends HoodieClientTestBase { + + private Random random; + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + random = new Random(0xDEED); + } + + @Test + public void testWritesViaMetadataWriteClient() throws Exception { + + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true) + .withRecordIndexFileGroupCount(1, 1).build()).build(); + + // trigger end to end write to data table so that metadata table is also initialized. + initDataTableWithACommit(hoodieWriteConfig); + + // fetch metadata file slice info + HoodieWriteConfig mdtWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, HoodieFailedWritesCleaningPolicy.EAGER); + Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>(); + List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new ArrayList<>(); + List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>(); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf()).build(); + fetchMetadataFileSliceInfo(metadataMetaClient, filesPartitionFileGroupIdList, nonFilesPartitionFileGroupIdList, mdtPartitionsFileIdMapping); + + List<HoodieRecord> filesPartitionExpectedRecords = new ArrayList<>(); + Map<String, HoodieRecord> filesPartitionExpectedRecordsMap = new HashMap<>(); + List<String> expectedAllPartitions = new ArrayList<>(); + List<HoodieRecord> rliRecords = new ArrayList<>(); + Map<String, HoodieRecord> rliPartitionExpectedRecordsMap = new HashMap<>(); + String commitTimeOfInterest = null; + + // create Write client to SparkRDDMetadataWriteClient and trigger writes. + try (SparkRDDMetadataWriteClient client = new SparkRDDMetadataWriteClient(context, mdtWriteConfig)) { + commitTimeOfInterest = client.createNewInstantTime(); + + // prepare FILES partition records. + prepareFilesPartitionRecords(mdtPartitionsFileIdMapping.get(MetadataPartitionType.FILES.getPartitionPath()).get(0), + commitTimeOfInterest, filesPartitionExpectedRecords, filesPartitionExpectedRecordsMap, expectedAllPartitions); + + // prepare RLI records. + prepareRliRecords(commitTimeOfInterest, mdtPartitionsFileIdMapping, expectedAllPartitions, rliRecords, rliPartitionExpectedRecordsMap); + + // ingest RLI records to metadata table. + client.startCommitForMetadataTable(metadataMetaClient, commitTimeOfInterest, DELTA_COMMIT_ACTION); + JavaRDD<WriteStatus> partialWriteStatusesRDD = client.upsertPreppedRecords(jsc.parallelize(rliRecords), commitTimeOfInterest, Option.of(nonFilesPartitionFileGroupIdList)); + List<WriteStatus> partialWriteStatuses = partialWriteStatusesRDD.collect(); + + // validate that the commit is still pending since we are streaming write to metadata table. + HoodieActiveTimeline reloadedMdtActiveTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(reloadedMdtActiveTimeline.filterCompletedInstants().getInstants().stream().count(), 3); // files, rli instantiaton and 1 write to data table. + String finalCommitTimeOfInterest = commitTimeOfInterest; + assertTrue(reloadedMdtActiveTimeline.filterInflightsAndRequested().getInstants().stream().anyMatch(instant -> instant.requestedTime().equals(finalCommitTimeOfInterest))); + + // write to FILES partition + JavaRDD<WriteStatus> filePartitionWriteStatusesRDD = client.upsertPreppedRecords(jsc.parallelize(filesPartitionExpectedRecords), commitTimeOfInterest, Option.of(filesPartitionFileGroupIdList)); + List<WriteStatus> filesPartitionWriteStatus = filePartitionWriteStatusesRDD.collect(); + List<HoodieWriteStat> allWriteStats = new ArrayList<>(); + allWriteStats.addAll(partialWriteStatuses.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList())); + allWriteStats.addAll(filesPartitionWriteStatus.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList())); + client.commitStats(commitTimeOfInterest, allWriteStats, Option.empty(), DELTA_COMMIT_ACTION); + } + + // validate + readFromMetadataTableAndValidateRecords(metadataMetaClient, hoodieWriteConfig, filesPartitionExpectedRecordsMap, rliPartitionExpectedRecordsMap, commitTimeOfInterest); + } + + private void initDataTableWithACommit(HoodieWriteConfig hoodieWriteConfig) throws Exception { + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + final int numRecords = 100; + String newCommitTime = client.createNewInstantTime(); + insertBatch(hoodieWriteConfig, client, newCommitTime, HoodieTimeline.INIT_INSTANT_TS, numRecords, SparkRDDWriteClient::insert, + false, true, numRecords, numRecords, 1, Option.empty(), INSTANT_GENERATOR); + } + } + + private void fetchMetadataFileSliceInfo(HoodieTableMetaClient metadataMetaClient, List<HoodieFileGroupId> filesPartitionFileGroupIdList, + List<HoodieFileGroupId> nonFilesPartitionsFileGroupIdList, Map<String, List<String>> mdtPartitionsFileIdMapping) { + try (HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) { + List<FileSlice> fileSlices = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath()); + mdtPartitionsFileIdMapping.put(MetadataPartitionType.FILES.getPartitionPath(), fileSlices.stream().map(fileSlice -> fileSlice.getFileId()).collect(Collectors.toList())); + fileSlices.stream().forEach(fileSlice -> { + filesPartitionFileGroupIdList.add(new HoodieFileGroupId(MetadataPartitionType.FILES.getPartitionPath(), fileSlice.getFileId())); + }); + + fileSlices = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.RECORD_INDEX.getPartitionPath()); + mdtPartitionsFileIdMapping.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), fileSlices.stream().map(fileSlice -> fileSlice.getFileId()).collect(Collectors.toList())); + fileSlices.stream().forEach(fileSlice -> { + nonFilesPartitionsFileGroupIdList.add(new HoodieFileGroupId(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), fileSlice.getFileId())); + }); + } + } + + private void readFromMetadataTableAndValidateRecords(HoodieTableMetaClient metadataMetaClient, HoodieWriteConfig hoodieWriteConfig, + Map<String, HoodieRecord> filesPartitionExpectedRecordsMap, + Map<String, HoodieRecord> rliPartitionExpectedRecordsMap, + String validMetadataInstant) throws IOException { + // read from MDT and validate all records. + metaClient = HoodieTableMetaClient.reload(metaClient); + metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient); + try (HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) { + FileSlice filesFileSlice = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.FILES.getPartitionPath()).get(0); + readFromMDTFileSliceAndValidate(metadataMetaClient, hoodieWriteConfig, filesFileSlice, filesPartitionExpectedRecordsMap, validMetadataInstant); + + FileSlice rliFileSlice = + HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), MetadataPartitionType.RECORD_INDEX.getPartitionPath()).get(0); + readFromMDTFileSliceAndValidate(metadataMetaClient, hoodieWriteConfig, rliFileSlice, rliPartitionExpectedRecordsMap, validMetadataInstant); + } + } + + private void readFromMDTFileSliceAndValidate(HoodieTableMetaClient metadataMetaClient, HoodieWriteConfig hoodieWriteConfig, FileSlice fileSlice, Map<String, HoodieRecord> expectedRecordsMap, + String validMetadataInstant) + throws IOException { + // open readers + Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = openReaders(MetadataPartitionType.FILES.getPartitionPath(), fileSlice, metaClient, metadataMetaClient, hoodieWriteConfig, + validMetadataInstant); + try { + // read from the file slice of interest. + List<String> sortedKeysForFilesPartition = new ArrayList<>(expectedRecordsMap.keySet()); + Collections.sort(sortedKeysForFilesPartition); + + Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords = readLogRecords(readers.getRight(), sortedKeysForFilesPartition); + + Map<String, HoodieRecord<HoodieMetadataPayload>> actualMdtRecordMap = + readFromBaseAndMergeWithLogRecords(readers.getKey(), sortedKeysForFilesPartition, logRecords, fileSlice.getPartitionPath()); + assertEquals(actualMdtRecordMap.size(), expectedRecordsMap.size()); + actualMdtRecordMap.forEach((k, v) -> { + assertTrue(expectedRecordsMap.containsKey(k)); + if (!k.equals(RECORDKEY_PARTITION_LIST)) { + // ignore __all_partition_records sinec it could have partitions from first commit which could be from HoodieTestDatagenerator. + assertEquals(((HoodieMetadataPayload) expectedRecordsMap.get(k).getData()).getFilenames(), ((HoodieMetadataPayload) v.getData()).getFilenames()); + } + }); + } finally { + if (readers.getKey() != null) { + readers.getKey().close(); + } + if (readers.getValue() != null) { + readers.getValue().close(); + } + } + } + + private Map<String, HoodieRecord<HoodieMetadataPayload>> readLogRecords(HoodieMetadataLogRecordReader logRecordReader, + List<String> sortedKeys) { + if (logRecordReader == null) { + return Collections.emptyMap(); + } + + return logRecordReader.getRecordsByKeys(sortedKeys); + } + + private Map<String, HoodieRecord<HoodieMetadataPayload>> readFromBaseAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader, + List<String> sortedKeys, + Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords, + String partitionName) throws IOException { + if (reader == null) { + // No base file at all + return logRecords; + } + + Map<String, HoodieRecord<HoodieMetadataPayload>> records = + fetchBaseFileRecordsByKeys(reader, sortedKeys, partitionName); + + // Iterate over all provided log-records, merging them into existing records + logRecords.values().forEach(logRecord -> + records.merge( + logRecord.getRecordKey(), + logRecord, + (oldRecord, newRecord) -> { + HoodieMetadataPayload mergedPayload = newRecord.getData().preCombine(oldRecord.getData()); + return mergedPayload.isDeleted() ? null : new HoodieAvroRecord<>(oldRecord.getKey(), mergedPayload); + } + )); + + return records; + } + + @SuppressWarnings("unchecked") + private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileRecordsByKeys(HoodieSeekingFileReader reader, + List<String> sortedKeys, + String partitionName) throws IOException { + Map<String, HoodieRecord<HoodieMetadataPayload>> result; + try (ClosableIterator<HoodieRecord<?>> records = reader.getRecordsByKeysIterator(sortedKeys)) { + result = toStream(records) + .map(record -> { + GenericRecord data = (GenericRecord) record.getData(); + // populateMetaFields is hardcoded to false for the metadata table so key must be extracted from the `key` field + String recordKey = (String) data.get(HoodieMetadataPayload.KEY_FIELD_NAME); + return Pair.of(recordKey, composeRecord(data, recordKey, partitionName)); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + } + return result; + } + + private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord avroRecord, String recordKey, String partitionName) { + return new HoodieAvroRecord<>(new HoodieKey(recordKey, partitionName), + new HoodieMetadataPayload(avroRecord, 0L), null); + } + + private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> openReaders(String partitionName, FileSlice slice, HoodieTableMetaClient dataMetaClient, + HoodieTableMetaClient metadataMetaClient, HoodieWriteConfig datatableWriteConfig, + String validMetadataInstant) { + try { + HoodieSeekingFileReader<?> baseFileReader = getBaseFileReader(slice, metadataMetaClient); + List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList()); + HoodieMetadataLogRecordReader logRecordScanner = getLogRecordScanner(logFiles, partitionName, dataMetaClient, metadataMetaClient, datatableWriteConfig.getMetadataConfig(), + validMetadataInstant); + return Pair.of(baseFileReader, logRecordScanner); + } catch (IOException e) { + throw new HoodieIOException("Error opening readers for metadata table partition " + partitionName, e); + } + } + + private HoodieSeekingFileReader<?> getBaseFileReader(FileSlice slice, HoodieTableMetaClient metadataMetaClient) throws IOException { Review Comment: yes, exactly. infact, it slipped my mind and used FG reader to directly read from a file group in MDT and ran into exceptions :) and then had to pull in so much code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
