vamsikarnika commented on code in PR #14260:
URL: https://github.com/apache/hudi/pull/14260#discussion_r2548554997
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends
SparkHoodieBackedTableMetadataWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+ private final String sourceBasePath;
+ private String inflightInstantTimestamp;
+
+ public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
+ HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+ String inflightInstantTimestamp,
String sourceBasePath) {
+ super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
Option.of(inflightInstantTimestamp));
+ this.sourceBasePath = sourceBasePath;
+ this.inflightInstantTimestamp = inflightInstantTimestamp;
+ }
+
+ public static HoodieTableMetadataWriter create(Configuration conf,
+ HoodieWriteConfig writeConfig,
+
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+ HoodieEngineContext context,
+ String
inflightInstantTimestamp) {
+ return new SparkHoodieBackedMetadataSyncMetadataWriter(
+ conf, writeConfig, failedWritesCleaningPolicy, context,
inflightInstantTimestamp,
writeConfig.getMetadataConfig().getBasePathOverride());
+ }
+
+ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+ Option<String>
inflightInstantTimestamp) throws IOException {
+ // Do not initialize the metadata table during metadata sync
+ metadataMetaClient = initializeMetaClient();
+ this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+ return true;
+ }
+
+ public void bootstrap(Option<String> boostrapUntilInstantOpt) throws
IOException {
+ if (!boostrapUntilInstantOpt.isPresent()) {
+ return;
+ }
+
+ String lastInstantTimestamp = boostrapUntilInstantOpt.get();
+ boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+ // Check and then open the metadata table reader so FILES partition can be
read during initialization of other partitions
+ // initMetadataReader();
+ // Load the metadata table metaclient if required
+ if (dataMetaClient == null) {
+ dataMetaClient =
HoodieTableMetaClient.builder().setConf(engineContext.getHadoopConf().get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+ }
+
+ // initialize metadata writer
+ List<DirectoryInfo> partitionInfoList =
listAllPartitionsFromFilesystem(lastInstantTimestamp, sourceBasePath);
+ // Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair =
initializeFilesPartition(partitionInfoList);
+ Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair =
initializeFilesPartition2(lastInstantTimestamp, partitionInfoList);
+
+ try {
+ if (!filesPartitionAvailable) {
+ initializeFileGroups(metadataMetaClient, MetadataPartitionType.FILES,
inflightInstantTimestamp, fileGroupCountAndRecordsPair.getKey());
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Failed to bootstrap table " + sourceBasePath,
e);
+ }
+
+ // Perform the commit using bulkCommit
+ HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
+ // perform tables services on metadata table
+ performTableServices(Option.of(inflightInstantTimestamp));
+ if (!filesPartitionAvailable) {
+ bulkCommit(inflightInstantTimestamp, MetadataPartitionType.FILES,
records, fileGroupCountAndRecordsPair.getKey());
+ dataMetaClient.reloadActiveTimeline();
+
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient,
MetadataPartitionType.FILES, true);
+ } else {
+ commit(inflightInstantTimestamp,
Collections.singletonMap(MetadataPartitionType.FILES,
fileGroupCountAndRecordsPair.getValue()));
+ }
+ }
+
+ protected Pair<Integer, HoodieData<HoodieRecord>>
initializeFilesPartition2(String lastInstantTimestamp, List<DirectoryInfo>
partitionInfoList) {
Review Comment:
Added java docs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]