nsivabalan commented on code in PR #14260:
URL: https://github.com/apache/hudi/pull/14260#discussion_r2540499042
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends
SparkHoodieBackedTableMetadataWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+ private final String sourceBasePath;
+ private String inflightInstantTimestamp;
+
+ public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
+ HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+ String inflightInstantTimestamp,
String sourceBasePath) {
+ super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
Option.of(inflightInstantTimestamp));
+ this.sourceBasePath = sourceBasePath;
+ this.inflightInstantTimestamp = inflightInstantTimestamp;
+ }
+
+ public static HoodieTableMetadataWriter create(Configuration conf,
+ HoodieWriteConfig writeConfig,
+
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+ HoodieEngineContext context,
+ String
inflightInstantTimestamp) {
+ return new SparkHoodieBackedMetadataSyncMetadataWriter(
+ conf, writeConfig, failedWritesCleaningPolicy, context,
inflightInstantTimestamp,
writeConfig.getMetadataConfig().getBasePathOverride());
+ }
+
+ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+ Option<String>
inflightInstantTimestamp) throws IOException {
+ // Do not initialize the metadata table during metadata sync
+ metadataMetaClient = initializeMetaClient();
+ this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+ return true;
+ }
+
+ public void bootstrap(Option<String> boostrapUntilInstantOpt) throws
IOException {
+ if (!boostrapUntilInstantOpt.isPresent()) {
+ return;
+ }
+
+ String lastInstantTimestamp = boostrapUntilInstantOpt.get();
+ boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+ // Check and then open the metadata table reader so FILES partition can be
read during initialization of other partitions
+ // initMetadataReader();
+ // Load the metadata table metaclient if required
+ if (dataMetaClient == null) {
+ dataMetaClient =
HoodieTableMetaClient.builder().setConf(engineContext.getHadoopConf().get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+ }
+
+ // initialize metadata writer
+ List<DirectoryInfo> partitionInfoList =
listAllPartitionsFromFilesystem(lastInstantTimestamp, sourceBasePath);
+ // Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair =
initializeFilesPartition(partitionInfoList);
+ Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair =
initializeFilesPartition2(lastInstantTimestamp, partitionInfoList);
+
+ try {
+ if (!filesPartitionAvailable) {
+ initializeFileGroups(metadataMetaClient, MetadataPartitionType.FILES,
inflightInstantTimestamp, fileGroupCountAndRecordsPair.getKey());
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Failed to bootstrap table " + sourceBasePath,
e);
+ }
+
+ // Perform the commit using bulkCommit
+ HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
+ // perform tables services on metadata table
+ performTableServices(Option.of(inflightInstantTimestamp));
+ if (!filesPartitionAvailable) {
+ bulkCommit(inflightInstantTimestamp, MetadataPartitionType.FILES,
records, fileGroupCountAndRecordsPair.getKey());
+ dataMetaClient.reloadActiveTimeline();
+
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient,
MetadataPartitionType.FILES, true);
+ } else {
+ commit(inflightInstantTimestamp,
Collections.singletonMap(MetadataPartitionType.FILES,
fileGroupCountAndRecordsPair.getValue()));
+ }
+ }
+
+ protected Pair<Integer, HoodieData<HoodieRecord>>
initializeFilesPartition2(String lastInstantTimestamp, List<DirectoryInfo>
partitionInfoList) {
Review Comment:
fix the naming.
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -253,7 +253,7 @@ private Map<PartitionPath, List<FileSlice>>
loadFileSlicesForPartitions(List<Par
Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
HoodieTableFileSystemView fileSystemView =
- new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
+ new HoodieTableFileSystemView(metaClient, activeTimeline,
Option.of(metadataConfig.getNumPartitionLevels()), allFiles);
Review Comment:
this may not work.
we have to do it differently
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends
SparkHoodieBackedTableMetadataWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+ private final String sourceBasePath;
+ private String inflightInstantTimestamp;
+
+ public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
+ HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+ String inflightInstantTimestamp,
String sourceBasePath) {
+ super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
Option.of(inflightInstantTimestamp));
+ this.sourceBasePath = sourceBasePath;
+ this.inflightInstantTimestamp = inflightInstantTimestamp;
+ }
+
+ public static HoodieTableMetadataWriter create(Configuration conf,
+ HoodieWriteConfig writeConfig,
+
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+ HoodieEngineContext context,
+ String
inflightInstantTimestamp) {
+ return new SparkHoodieBackedMetadataSyncMetadataWriter(
+ conf, writeConfig, failedWritesCleaningPolicy, context,
inflightInstantTimestamp,
writeConfig.getMetadataConfig().getBasePathOverride());
+ }
+
+ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+ Option<String>
inflightInstantTimestamp) throws IOException {
+ // Do not initialize the metadata table during metadata sync
+ metadataMetaClient = initializeMetaClient();
+ this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
Review Comment:
don't we need to initialize `initMetadataReader()` if FILES partition is
already bootstrapped?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends
SparkHoodieBackedTableMetadataWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+ private final String sourceBasePath;
+ private String inflightInstantTimestamp;
+
+ public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
+ HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+ String inflightInstantTimestamp,
String sourceBasePath) {
+ super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
Option.of(inflightInstantTimestamp));
+ this.sourceBasePath = sourceBasePath;
+ this.inflightInstantTimestamp = inflightInstantTimestamp;
+ }
+
+ public static HoodieTableMetadataWriter create(Configuration conf,
+ HoodieWriteConfig writeConfig,
+
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+ HoodieEngineContext context,
+ String
inflightInstantTimestamp) {
+ return new SparkHoodieBackedMetadataSyncMetadataWriter(
+ conf, writeConfig, failedWritesCleaningPolicy, context,
inflightInstantTimestamp,
writeConfig.getMetadataConfig().getBasePathOverride());
+ }
+
+ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+ Option<String>
inflightInstantTimestamp) throws IOException {
+ // Do not initialize the metadata table during metadata sync
+ metadataMetaClient = initializeMetaClient();
+ this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+ return true;
+ }
+
+ public void bootstrap(Option<String> boostrapUntilInstantOpt) throws
IOException {
+ if (!boostrapUntilInstantOpt.isPresent()) {
+ return;
+ }
+
+ String lastInstantTimestamp = boostrapUntilInstantOpt.get();
+ boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+ // Check and then open the metadata table reader so FILES partition can be
read during initialization of other partitions
+ // initMetadataReader();
+ // Load the metadata table metaclient if required
+ if (dataMetaClient == null) {
+ dataMetaClient =
HoodieTableMetaClient.builder().setConf(engineContext.getHadoopConf().get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+ }
+
+ // initialize metadata writer
+ List<DirectoryInfo> partitionInfoList =
listAllPartitionsFromFilesystem(lastInstantTimestamp, sourceBasePath);
+ // Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair =
initializeFilesPartition(partitionInfoList);
+ Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair =
initializeFilesPartition2(lastInstantTimestamp, partitionInfoList);
+
+ try {
+ if (!filesPartitionAvailable) {
+ initializeFileGroups(metadataMetaClient, MetadataPartitionType.FILES,
inflightInstantTimestamp, fileGroupCountAndRecordsPair.getKey());
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Failed to bootstrap table " + sourceBasePath,
e);
+ }
+
+ // Perform the commit using bulkCommit
Review Comment:
fix java docs. we may not do bulk commit for every bootstrap flow.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends
SparkHoodieBackedTableMetadataWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+ private final String sourceBasePath;
+ private String inflightInstantTimestamp;
+
+ public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
+ HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+ String inflightInstantTimestamp,
String sourceBasePath) {
+ super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
Option.of(inflightInstantTimestamp));
+ this.sourceBasePath = sourceBasePath;
+ this.inflightInstantTimestamp = inflightInstantTimestamp;
+ }
+
+ public static HoodieTableMetadataWriter create(Configuration conf,
+ HoodieWriteConfig writeConfig,
+
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+ HoodieEngineContext context,
+ String
inflightInstantTimestamp) {
+ return new SparkHoodieBackedMetadataSyncMetadataWriter(
+ conf, writeConfig, failedWritesCleaningPolicy, context,
inflightInstantTimestamp,
writeConfig.getMetadataConfig().getBasePathOverride());
+ }
+
+ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+ Option<String>
inflightInstantTimestamp) throws IOException {
+ // Do not initialize the metadata table during metadata sync
+ metadataMetaClient = initializeMetaClient();
+ this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+ return true;
+ }
+
+ public void bootstrap(Option<String> boostrapUntilInstantOpt) throws
IOException {
Review Comment:
java docs please
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends
SparkHoodieBackedTableMetadataWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+ private final String sourceBasePath;
+ private String inflightInstantTimestamp;
+
+ public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
+ HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+ String inflightInstantTimestamp,
String sourceBasePath) {
+ super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
Option.of(inflightInstantTimestamp));
+ this.sourceBasePath = sourceBasePath;
+ this.inflightInstantTimestamp = inflightInstantTimestamp;
+ }
+
+ public static HoodieTableMetadataWriter create(Configuration conf,
+ HoodieWriteConfig writeConfig,
+
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+ HoodieEngineContext context,
+ String
inflightInstantTimestamp) {
+ return new SparkHoodieBackedMetadataSyncMetadataWriter(
+ conf, writeConfig, failedWritesCleaningPolicy, context,
inflightInstantTimestamp,
writeConfig.getMetadataConfig().getBasePathOverride());
+ }
+
+ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+ Option<String>
inflightInstantTimestamp) throws IOException {
+ // Do not initialize the metadata table during metadata sync
+ metadataMetaClient = initializeMetaClient();
+ this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+ return true;
+ }
+
+ public void bootstrap(Option<String> boostrapUntilInstantOpt) throws
IOException {
+ if (!boostrapUntilInstantOpt.isPresent()) {
+ return;
+ }
+
+ String lastInstantTimestamp = boostrapUntilInstantOpt.get();
+ boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+ // Check and then open the metadata table reader so FILES partition can be
read during initialization of other partitions
+ // initMetadataReader();
+ // Load the metadata table metaclient if required
+ if (dataMetaClient == null) {
+ dataMetaClient =
HoodieTableMetaClient.builder().setConf(engineContext.getHadoopConf().get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+ }
+
+ // initialize metadata writer
+ List<DirectoryInfo> partitionInfoList =
listAllPartitionsFromFilesystem(lastInstantTimestamp, sourceBasePath);
+ // Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair =
initializeFilesPartition(partitionInfoList);
Review Comment:
lets clean up temp changes.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataSync.java:
##########
@@ -245,90 +248,166 @@ public void run() throws Exception {
}
private void runMetadataSync(HoodieTableMetaClient sourceTableMetaClient,
HoodieTableMetaClient targetTableMetaClient, Schema schema) throws Exception {
- HoodieWriteConfig writeConfig = getWriteConfig(schema,
targetTableMetaClient, cfg.sourceBasePath);
+ HoodieWriteConfig writeConfig = getWriteConfig(schema,
targetTableMetaClient, cfg.sourceBasePath, cfg.boostrap);
+ HoodieSparkEngineContext hoodieSparkEngineContext = new
HoodieSparkEngineContext(jsc);
+ TransactionManager txnManager = new TransactionManager(writeConfig,
FSUtils.getFs(writeConfig.getBasePath(),
hoodieSparkEngineContext.getHadoopConf().get()));
+
+ HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig,
hoodieSparkEngineContext, targetTableMetaClient);
+ try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
+ if (cfg.boostrap) {
+ runBootstrapSync(sparkTable, sourceTableMetaClient,
targetTableMetaClient, writeClient, schema, txnManager);
Review Comment:
we are missing to lock for bootstrap.
can we begin txn at the beginning (before L 257).
and finally end the trnx w/n finally block or in the end
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataSync.java:
##########
@@ -245,90 +248,166 @@ public void run() throws Exception {
}
private void runMetadataSync(HoodieTableMetaClient sourceTableMetaClient,
HoodieTableMetaClient targetTableMetaClient, Schema schema) throws Exception {
- HoodieWriteConfig writeConfig = getWriteConfig(schema,
targetTableMetaClient, cfg.sourceBasePath);
+ HoodieWriteConfig writeConfig = getWriteConfig(schema,
targetTableMetaClient, cfg.sourceBasePath, cfg.boostrap);
+ HoodieSparkEngineContext hoodieSparkEngineContext = new
HoodieSparkEngineContext(jsc);
+ TransactionManager txnManager = new TransactionManager(writeConfig,
FSUtils.getFs(writeConfig.getBasePath(),
hoodieSparkEngineContext.getHadoopConf().get()));
+
+ HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig,
hoodieSparkEngineContext, targetTableMetaClient);
+ try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
+ if (cfg.boostrap) {
+ runBootstrapSync(sparkTable, sourceTableMetaClient,
targetTableMetaClient, writeClient, schema, txnManager);
+ } else {
+ List<HoodieInstant> instantsToSync =
getInstantsToSync(cfg.sourceBasePath, targetTableMetaClient,
sourceTableMetaClient);
+ for (HoodieInstant instant : instantsToSync) {
+ String commitTime = writeClient.startCommit(instant.getAction(),
targetTableMetaClient); // single writer. will rollback any pending commits
from previous round.
+ targetTableMetaClient
+ .reloadActiveTimeline()
+ .transitionRequestedToInflight(
+ instant.getAction(),
+ commitTime);
+
+ Option<byte[]> commitMetadataInBytes = Option.empty();
+ List<String> pendingInstants =
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), instant);
+ SyncMetadata syncMetadata =
getTableSyncExtraMetadata(targetTableMetaClient.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant(),
+ targetTableMetaClient, cfg.sourceBasePath,
instant.getTimestamp(), pendingInstants);
+
+ try {
+ txnManager.beginTransaction(Option.of(instant), Option.empty());
+ HoodieTableMetadataWriter hoodieTableMetadataWriter =
+ (HoodieTableMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get();
+ // perform table services if required on metadata table
+
hoodieTableMetadataWriter.performTableServices(Option.of(commitTime));
+ switch (instant.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ HoodieCommitMetadata sourceCommitMetadata =
getHoodieCommitMetadata(instant.getTimestamp(), sourceTableMetaClient);
+ HoodieCommitMetadata tgtCommitMetadata =
buildHoodieCommitMetadata(sourceCommitMetadata, commitTime);
+ hoodieTableMetadataWriter.update(tgtCommitMetadata,
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
+
+ // add metadata sync checkpoint info
+
tgtCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
+ commitMetadataInBytes =
Option.of(tgtCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+ break;
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+
+ HoodieReplaceCommitMetadata srcReplaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
+
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
+ HoodieReplaceCommitMetadata tgtReplaceCommitMetadata =
buildReplaceCommitMetadata(srcReplaceCommitMetadata, commitTime);
+ hoodieTableMetadataWriter.update(tgtReplaceCommitMetadata,
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
+
+ // add metadata sync checkpoint info
+
tgtReplaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
+ commitMetadataInBytes =
Option.of(tgtReplaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+ break;
+ case HoodieTimeline.CLEAN_ACTION:
+ HoodieCleanMetadata srcCleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
+
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get());
+ HoodieCleanMetadata tgtCleanMetadata =
reconstructHoodieCleanCommitMetadata(srcCleanMetadata,
+ writeConfig, hoodieSparkEngineContext,
targetTableMetaClient);
+ //HoodieCleanMetadata tgtCleanMetadata =
buildHoodieCleanMetadata(srcCleanMetadata, commitTime);
+ hoodieTableMetadataWriter.update(tgtCleanMetadata, commitTime);
+
+ commitMetadataInBytes =
TimelineMetadataUtils.serializeCleanMetadata(tgtCleanMetadata);
+ break;
+ }
+
+ } finally {
+ txnManager.endTransaction(Option.of(instant));
+ }
+ targetTableMetaClient
+ .reloadActiveTimeline()
+ .saveAsComplete(new HoodieInstant(true, instant.getAction(),
commitTime), commitMetadataInBytes);
+ }
+ if (cfg.performTableMaintenance) {
+ runArchiver(sparkTable, writeClient.getConfig(),
hoodieSparkEngineContext);
+ }
+ }
+ }
+ }
- if (cfg.boostrap) {
- HoodieBootstrapMetadataSync bootstrapMetadataSync = new
HoodieBootstrapMetadataSync(writeConfig, jsc, cfg.sourceBasePath,
cfg.targetBasePath, schema);
- bootstrapMetadataSync.run();
+ private void runBootstrapSync(HoodieSparkTable sparkTable,
HoodieTableMetaClient sourceTableMetaClient,
+ HoodieTableMetaClient targetTableMetaClient,
SparkRDDWriteClient writeClient, Schema schema, TransactionManager txnManager)
throws Exception {
+ Option<HoodieInstant> sourceLastInstant =
sourceTableMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
+ if (!sourceLastInstant.isPresent()) {
return;
}
- List<HoodieInstant> instantsToSync = getInstantsToSync(cfg.sourceBasePath,
targetTableMetaClient, sourceTableMetaClient);
- HoodieSparkEngineContext hoodieSparkEngineContext = new
HoodieSparkEngineContext(jsc);
- for (HoodieInstant instant : instantsToSync) {
- try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
- String commitTime = writeClient.startCommit(instant.getAction(),
targetTableMetaClient); // single writer. will rollback any pending commits
from previous round.
- targetTableMetaClient
- .reloadActiveTimeline()
- .transitionRequestedToInflight(
- instant.getAction(),
- commitTime);
-
- Option<byte[]> commitMetadataInBytes = Option.empty();
- List<String> pendingInstants =
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), instant);
- SyncMetadata syncMetadata =
getTableSyncExtraMetadata(targetTableMetaClient.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant(),
- targetTableMetaClient,
sourceTableMetaClient.getBasePathV2().toString(), instant.getTimestamp(),
pendingInstants);
-
-
- HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig,
hoodieSparkEngineContext, targetTableMetaClient);
- if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
- HoodieCommitMetadata sourceCommitMetadata =
getHoodieCommitMetadata(instant.getTimestamp(), sourceTableMetaClient);
- HoodieCommitMetadata tgtCommitMetadata =
buildHoodieCommitMetadata(sourceCommitMetadata, commitTime);
-
- try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
- (HoodieTableMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get()) {
- hoodieTableMetadataWriter.update(tgtCommitMetadata,
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
- }
+ String commitTime =
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION,
targetTableMetaClient); // single writer. will rollback any pending commits
from previous round.
+ targetTableMetaClient
+ .reloadActiveTimeline()
+ .transitionRequestedToInflight(
+ HoodieTimeline.REPLACE_COMMIT_ACTION,
+ commitTime);
- // add metadata sync checkpoint info
- tgtCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
- commitMetadataInBytes =
Option.of(tgtCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
- } else if
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ HoodieInstant instant = new HoodieInstant(true,
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime);
+ try {
+ txnManager.beginTransaction(Option.of(instant), Option.empty());
+ SparkHoodieBackedMetadataSyncMetadataWriter metadataWriter =
+ (SparkHoodieBackedMetadataSyncMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get();
+
metadataWriter.bootstrap(sourceLastInstant.map(HoodieInstant::getTimestamp));
+ } finally {
+ txnManager.endTransaction(Option.of(instant));
+ }
+ Option<HoodieInstant> targetTableLastInstant =
targetTableMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+ SyncMetadata syncMetadata =
getTableSyncExtraMetadata(targetTableLastInstant, targetTableMetaClient,
+ cfg.sourceBasePath, sourceLastInstant.get().getTimestamp(),
getPendingInstants(sourceTableMetaClient.getActiveTimeline(),
sourceLastInstant.get()));
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
buildComprehensiveReplaceCommitMetadata(sourceTableMetaClient, schema);
+ replaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
+ targetTableMetaClient
+ .reloadActiveTimeline()
+ .saveAsComplete(new HoodieInstant(true,
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime),
+
Option.of(replaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ }
- HoodieReplaceCommitMetadata srcReplaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
-
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
- HoodieReplaceCommitMetadata tgtReplaceCommitMetadata =
buildReplaceCommitMetadata(srcReplaceCommitMetadata, commitTime);
+ private HoodieReplaceCommitMetadata
buildComprehensiveReplaceCommitMetadata(HoodieTableMetaClient
sourceTableMetaClient, Schema schema) {
+ List<HoodieInstant> replaceCommits =
sourceTableMetaClient.getActiveTimeline().filterCompletedInstants().getInstants().stream()
+ .filter(instant ->
instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).collect(Collectors.toList());
+
+ HoodieReplaceCommitMetadata replaceCommitMetadata = new
HoodieReplaceCommitMetadata();
+ Map<String, List<String>> totalPartitionToReplacedFiles = new HashMap<>();
+ replaceCommits.forEach(replaceCommit -> {
+ try {
+ HoodieReplaceCommitMetadata commitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
+
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(replaceCommit).get(),
HoodieReplaceCommitMetadata.class);
+ Map<String, List<String>> partitionsToReplacedFileGroups =
commitMetadata.getPartitionToReplaceFileIds();
+ for (Map.Entry<String, List<String>> entry :
partitionsToReplacedFileGroups.entrySet()) {
+ String partition = entry.getKey();
+ List<String> replacedFiles = entry.getValue();
+ totalPartitionToReplacedFiles.computeIfAbsent(partition, k -> new
ArrayList<>());
+ totalPartitionToReplacedFiles.get(partition).addAll(replacedFiles);
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Failed to deserialize instant " +
replaceCommit.getTimestamp(), e);
+ }
+ });
- try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
- (HoodieTableMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get()) {
- hoodieTableMetadataWriter.update(tgtReplaceCommitMetadata,
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
- }
- // add metadata sync checkpoint info
-
tgtReplaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
- commitMetadataInBytes =
Option.of(tgtReplaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
- } else if (instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)) {
- HoodieCleanMetadata srcCleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
-
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get());
- HoodieCleanMetadata tgtCleanMetadata =
reconstructHoodieCleanCommitMetadata(srcCleanMetadata,
- writeConfig, hoodieSparkEngineContext, targetTableMetaClient);
- //HoodieCleanMetadata tgtCleanMetadata =
buildHoodieCleanMetadata(srcCleanMetadata, commitTime);
- try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
- (HoodieTableMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get()) {
- hoodieTableMetadataWriter.update(tgtCleanMetadata, commitTime);
- }
+
replaceCommitMetadata.setPartitionToReplaceFileIds(totalPartitionToReplacedFiles);
- commitMetadataInBytes =
TimelineMetadataUtils.serializeCleanMetadata(tgtCleanMetadata);
- }
+ replaceCommitMetadata.addMetadata("schema", schema.toString());
+ replaceCommitMetadata.setOperationType(WriteOperationType.BOOTSTRAP);
+ replaceCommitMetadata.setCompacted(false);
+ return replaceCommitMetadata;
+ }
- targetTableMetaClient
- .reloadActiveTimeline()
- .saveAsComplete(new HoodieInstant(true, instant.getAction(),
commitTime), commitMetadataInBytes);
- }
+ private void runArchiver(
Review Comment:
java docs
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends
SparkHoodieBackedTableMetadataWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+ private final String sourceBasePath;
+ private String inflightInstantTimestamp;
+
+ public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
+ HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+ String inflightInstantTimestamp,
String sourceBasePath) {
+ super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
Option.of(inflightInstantTimestamp));
+ this.sourceBasePath = sourceBasePath;
+ this.inflightInstantTimestamp = inflightInstantTimestamp;
+ }
+
+ public static HoodieTableMetadataWriter create(Configuration conf,
+ HoodieWriteConfig writeConfig,
+
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+ HoodieEngineContext context,
+ String
inflightInstantTimestamp) {
+ return new SparkHoodieBackedMetadataSyncMetadataWriter(
+ conf, writeConfig, failedWritesCleaningPolicy, context,
inflightInstantTimestamp,
writeConfig.getMetadataConfig().getBasePathOverride());
+ }
+
+ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+ Option<String>
inflightInstantTimestamp) throws IOException {
+ // Do not initialize the metadata table during metadata sync
+ metadataMetaClient = initializeMetaClient();
+ this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+ return true;
+ }
+
+ public void bootstrap(Option<String> boostrapUntilInstantOpt) throws
IOException {
+ if (!boostrapUntilInstantOpt.isPresent()) {
+ return;
+ }
+
+ String lastInstantTimestamp = boostrapUntilInstantOpt.get();
+ boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+ // Check and then open the metadata table reader so FILES partition can be
read during initialization of other partitions
+ // initMetadataReader();
+ // Load the metadata table metaclient if required
+ if (dataMetaClient == null) {
+ dataMetaClient =
HoodieTableMetaClient.builder().setConf(engineContext.getHadoopConf().get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+ }
+
+ // initialize metadata writer
+ List<DirectoryInfo> partitionInfoList =
listAllPartitionsFromFilesystem(lastInstantTimestamp, sourceBasePath);
Review Comment:
if metadata table is enabled in the source table, shouldn't we be calling
listAllPartitionsFromMDT(...)
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends
SparkHoodieBackedTableMetadataWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+ private final String sourceBasePath;
+ private String inflightInstantTimestamp;
+
+ public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
+ HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+ String inflightInstantTimestamp,
String sourceBasePath) {
+ super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
Option.of(inflightInstantTimestamp));
+ this.sourceBasePath = sourceBasePath;
+ this.inflightInstantTimestamp = inflightInstantTimestamp;
+ }
+
+ public static HoodieTableMetadataWriter create(Configuration conf,
+ HoodieWriteConfig writeConfig,
+
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+ HoodieEngineContext context,
+ String
inflightInstantTimestamp) {
+ return new SparkHoodieBackedMetadataSyncMetadataWriter(
+ conf, writeConfig, failedWritesCleaningPolicy, context,
inflightInstantTimestamp,
writeConfig.getMetadataConfig().getBasePathOverride());
+ }
+
+ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+ Option<String>
inflightInstantTimestamp) throws IOException {
+ // Do not initialize the metadata table during metadata sync
+ metadataMetaClient = initializeMetaClient();
+ this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+ return true;
+ }
+
+ public void bootstrap(Option<String> boostrapUntilInstantOpt) throws
IOException {
+ if (!boostrapUntilInstantOpt.isPresent()) {
+ return;
+ }
+
+ String lastInstantTimestamp = boostrapUntilInstantOpt.get();
+ boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+ // Check and then open the metadata table reader so FILES partition can be
read during initialization of other partitions
+ // initMetadataReader();
+ // Load the metadata table metaclient if required
+ if (dataMetaClient == null) {
+ dataMetaClient =
HoodieTableMetaClient.builder().setConf(engineContext.getHadoopConf().get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+ }
+
+ // initialize metadata writer
+ List<DirectoryInfo> partitionInfoList =
listAllPartitionsFromFilesystem(lastInstantTimestamp, sourceBasePath);
+ // Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair =
initializeFilesPartition(partitionInfoList);
+ Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair =
initializeFilesPartition2(lastInstantTimestamp, partitionInfoList);
+
+ try {
+ if (!filesPartitionAvailable) {
+ initializeFileGroups(metadataMetaClient, MetadataPartitionType.FILES,
inflightInstantTimestamp, fileGroupCountAndRecordsPair.getKey());
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Failed to bootstrap table " + sourceBasePath,
e);
+ }
+
+ // Perform the commit using bulkCommit
+ HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
+ // perform tables services on metadata table
+ performTableServices(Option.of(inflightInstantTimestamp));
+ if (!filesPartitionAvailable) {
+ bulkCommit(inflightInstantTimestamp, MetadataPartitionType.FILES,
records, fileGroupCountAndRecordsPair.getKey());
+ dataMetaClient.reloadActiveTimeline();
+
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient,
MetadataPartitionType.FILES, true);
+ } else {
+ commit(inflightInstantTimestamp,
Collections.singletonMap(MetadataPartitionType.FILES,
fileGroupCountAndRecordsPair.getValue()));
+ }
+ }
+
+ protected Pair<Integer, HoodieData<HoodieRecord>>
initializeFilesPartition2(String lastInstantTimestamp, List<DirectoryInfo>
partitionInfoList) {
Review Comment:
java docs. how is this different from the impl we have in the base class
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends
SparkHoodieBackedTableMetadataWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+ private final String sourceBasePath;
+ private String inflightInstantTimestamp;
+
+ public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
+ HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+ String inflightInstantTimestamp,
String sourceBasePath) {
+ super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
Option.of(inflightInstantTimestamp));
+ this.sourceBasePath = sourceBasePath;
+ this.inflightInstantTimestamp = inflightInstantTimestamp;
+ }
+
+ public static HoodieTableMetadataWriter create(Configuration conf,
+ HoodieWriteConfig writeConfig,
+
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+ HoodieEngineContext context,
+ String
inflightInstantTimestamp) {
+ return new SparkHoodieBackedMetadataSyncMetadataWriter(
+ conf, writeConfig, failedWritesCleaningPolicy, context,
inflightInstantTimestamp,
writeConfig.getMetadataConfig().getBasePathOverride());
+ }
+
+ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+ Option<String>
inflightInstantTimestamp) throws IOException {
+ // Do not initialize the metadata table during metadata sync
+ metadataMetaClient = initializeMetaClient();
+ this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+ return true;
+ }
+
+ public void bootstrap(Option<String> boostrapUntilInstantOpt) throws
IOException {
+ if (!boostrapUntilInstantOpt.isPresent()) {
+ return;
+ }
+
+ String lastInstantTimestamp = boostrapUntilInstantOpt.get();
+ boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+ // Check and then open the metadata table reader so FILES partition can be
read during initialization of other partitions
+ // initMetadataReader();
+ // Load the metadata table metaclient if required
+ if (dataMetaClient == null) {
+ dataMetaClient =
HoodieTableMetaClient.builder().setConf(engineContext.getHadoopConf().get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+ }
+
+ // initialize metadata writer
+ List<DirectoryInfo> partitionInfoList =
listAllPartitionsFromFilesystem(lastInstantTimestamp, sourceBasePath);
+ // Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair =
initializeFilesPartition(partitionInfoList);
+ Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair =
initializeFilesPartition2(lastInstantTimestamp, partitionInfoList);
+
+ try {
+ if (!filesPartitionAvailable) {
+ initializeFileGroups(metadataMetaClient, MetadataPartitionType.FILES,
inflightInstantTimestamp, fileGroupCountAndRecordsPair.getKey());
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Failed to bootstrap table " + sourceBasePath,
e);
+ }
+
+ // Perform the commit using bulkCommit
+ HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
+ // perform tables services on metadata table
+ performTableServices(Option.of(inflightInstantTimestamp));
+ if (!filesPartitionAvailable) {
+ bulkCommit(inflightInstantTimestamp, MetadataPartitionType.FILES,
records, fileGroupCountAndRecordsPair.getKey());
+ dataMetaClient.reloadActiveTimeline();
+
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient,
MetadataPartitionType.FILES, true);
+ } else {
+ commit(inflightInstantTimestamp,
Collections.singletonMap(MetadataPartitionType.FILES,
fileGroupCountAndRecordsPair.getValue()));
+ }
+ }
+
+ protected Pair<Integer, HoodieData<HoodieRecord>>
initializeFilesPartition2(String lastInstantTimestamp, List<DirectoryInfo>
partitionInfoList) {
+ // FILES partition uses a single file group
+ final int fileGroupCount = 1;
+
+ List<String> partitions = partitionInfoList.stream().map(p ->
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()))
+ .collect(Collectors.toList());
+ final int totalDataFilesCount =
partitionInfoList.stream().mapToInt(DirectoryInfo::getTotalFiles).sum();
+ LOG.info("Committing total {} partitions and {} files to metadata",
partitions.size(), totalDataFilesCount);
+
+ // Record which saves the list of all partitions
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionListRecord(partitions);
+ HoodieData<HoodieRecord> allPartitionsRecord =
engineContext.parallelize(Collections.singletonList(record), 1);
+ if (partitionInfoList.isEmpty()) {
+ return Pair.of(fileGroupCount, allPartitionsRecord);
+ }
+
+ HoodieTableMetaClient sourceTableMetaClient =
HoodieTableMetaClient.builder().setBasePath(sourceBasePath).setConf(hadoopConf.get()).build();
+ FileSystemViewStorageConfig.Builder spillableConfBuilder =
FileSystemViewStorageConfig.newBuilder();
+
spillableConfBuilder.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK);
+
//.withBaseStoreDir(FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
+
//.withMemFractionForPendingCompaction(config.memFractionForCompactionPerTable);
+ HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().build();
+ HoodieTimeline timeline =
sourceTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
+ SpillableMapBasedFileSystemView fileSystemView = new
SpillableMapBasedFileSystemView(sourceTableMetaClient, timeline,
spillableConfBuilder.build(), commonConfig);
Review Comment:
we should close the FSV in the end
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataSync.java:
##########
@@ -245,90 +248,166 @@ public void run() throws Exception {
}
private void runMetadataSync(HoodieTableMetaClient sourceTableMetaClient,
HoodieTableMetaClient targetTableMetaClient, Schema schema) throws Exception {
- HoodieWriteConfig writeConfig = getWriteConfig(schema,
targetTableMetaClient, cfg.sourceBasePath);
+ HoodieWriteConfig writeConfig = getWriteConfig(schema,
targetTableMetaClient, cfg.sourceBasePath, cfg.boostrap);
+ HoodieSparkEngineContext hoodieSparkEngineContext = new
HoodieSparkEngineContext(jsc);
+ TransactionManager txnManager = new TransactionManager(writeConfig,
FSUtils.getFs(writeConfig.getBasePath(),
hoodieSparkEngineContext.getHadoopConf().get()));
+
+ HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig,
hoodieSparkEngineContext, targetTableMetaClient);
+ try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
+ if (cfg.boostrap) {
+ runBootstrapSync(sparkTable, sourceTableMetaClient,
targetTableMetaClient, writeClient, schema, txnManager);
+ } else {
+ List<HoodieInstant> instantsToSync =
getInstantsToSync(cfg.sourceBasePath, targetTableMetaClient,
sourceTableMetaClient);
+ for (HoodieInstant instant : instantsToSync) {
+ String commitTime = writeClient.startCommit(instant.getAction(),
targetTableMetaClient); // single writer. will rollback any pending commits
from previous round.
+ targetTableMetaClient
+ .reloadActiveTimeline()
+ .transitionRequestedToInflight(
+ instant.getAction(),
+ commitTime);
+
+ Option<byte[]> commitMetadataInBytes = Option.empty();
+ List<String> pendingInstants =
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), instant);
+ SyncMetadata syncMetadata =
getTableSyncExtraMetadata(targetTableMetaClient.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant(),
+ targetTableMetaClient, cfg.sourceBasePath,
instant.getTimestamp(), pendingInstants);
+
+ try {
+ txnManager.beginTransaction(Option.of(instant), Option.empty());
+ HoodieTableMetadataWriter hoodieTableMetadataWriter =
+ (HoodieTableMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get();
+ // perform table services if required on metadata table
+
hoodieTableMetadataWriter.performTableServices(Option.of(commitTime));
+ switch (instant.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ HoodieCommitMetadata sourceCommitMetadata =
getHoodieCommitMetadata(instant.getTimestamp(), sourceTableMetaClient);
+ HoodieCommitMetadata tgtCommitMetadata =
buildHoodieCommitMetadata(sourceCommitMetadata, commitTime);
+ hoodieTableMetadataWriter.update(tgtCommitMetadata,
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
+
+ // add metadata sync checkpoint info
+
tgtCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
+ commitMetadataInBytes =
Option.of(tgtCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+ break;
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+
+ HoodieReplaceCommitMetadata srcReplaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
+
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
+ HoodieReplaceCommitMetadata tgtReplaceCommitMetadata =
buildReplaceCommitMetadata(srcReplaceCommitMetadata, commitTime);
+ hoodieTableMetadataWriter.update(tgtReplaceCommitMetadata,
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
+
+ // add metadata sync checkpoint info
+
tgtReplaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
+ commitMetadataInBytes =
Option.of(tgtReplaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+ break;
+ case HoodieTimeline.CLEAN_ACTION:
+ HoodieCleanMetadata srcCleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
+
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get());
+ HoodieCleanMetadata tgtCleanMetadata =
reconstructHoodieCleanCommitMetadata(srcCleanMetadata,
+ writeConfig, hoodieSparkEngineContext,
targetTableMetaClient);
+ //HoodieCleanMetadata tgtCleanMetadata =
buildHoodieCleanMetadata(srcCleanMetadata, commitTime);
+ hoodieTableMetadataWriter.update(tgtCleanMetadata, commitTime);
+
+ commitMetadataInBytes =
TimelineMetadataUtils.serializeCleanMetadata(tgtCleanMetadata);
+ break;
+ }
+
+ } finally {
+ txnManager.endTransaction(Option.of(instant));
+ }
+ targetTableMetaClient
+ .reloadActiveTimeline()
+ .saveAsComplete(new HoodieInstant(true, instant.getAction(),
commitTime), commitMetadataInBytes);
+ }
+ if (cfg.performTableMaintenance) {
+ runArchiver(sparkTable, writeClient.getConfig(),
hoodieSparkEngineContext);
+ }
+ }
+ }
+ }
- if (cfg.boostrap) {
- HoodieBootstrapMetadataSync bootstrapMetadataSync = new
HoodieBootstrapMetadataSync(writeConfig, jsc, cfg.sourceBasePath,
cfg.targetBasePath, schema);
- bootstrapMetadataSync.run();
+ private void runBootstrapSync(HoodieSparkTable sparkTable,
HoodieTableMetaClient sourceTableMetaClient,
+ HoodieTableMetaClient targetTableMetaClient,
SparkRDDWriteClient writeClient, Schema schema, TransactionManager txnManager)
throws Exception {
+ Option<HoodieInstant> sourceLastInstant =
sourceTableMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
+ if (!sourceLastInstant.isPresent()) {
return;
}
- List<HoodieInstant> instantsToSync = getInstantsToSync(cfg.sourceBasePath,
targetTableMetaClient, sourceTableMetaClient);
- HoodieSparkEngineContext hoodieSparkEngineContext = new
HoodieSparkEngineContext(jsc);
- for (HoodieInstant instant : instantsToSync) {
- try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
- String commitTime = writeClient.startCommit(instant.getAction(),
targetTableMetaClient); // single writer. will rollback any pending commits
from previous round.
- targetTableMetaClient
- .reloadActiveTimeline()
- .transitionRequestedToInflight(
- instant.getAction(),
- commitTime);
-
- Option<byte[]> commitMetadataInBytes = Option.empty();
- List<String> pendingInstants =
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), instant);
- SyncMetadata syncMetadata =
getTableSyncExtraMetadata(targetTableMetaClient.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant(),
- targetTableMetaClient,
sourceTableMetaClient.getBasePathV2().toString(), instant.getTimestamp(),
pendingInstants);
-
-
- HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig,
hoodieSparkEngineContext, targetTableMetaClient);
- if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
- HoodieCommitMetadata sourceCommitMetadata =
getHoodieCommitMetadata(instant.getTimestamp(), sourceTableMetaClient);
- HoodieCommitMetadata tgtCommitMetadata =
buildHoodieCommitMetadata(sourceCommitMetadata, commitTime);
-
- try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
- (HoodieTableMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get()) {
- hoodieTableMetadataWriter.update(tgtCommitMetadata,
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
- }
+ String commitTime =
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION,
targetTableMetaClient); // single writer. will rollback any pending commits
from previous round.
+ targetTableMetaClient
+ .reloadActiveTimeline()
+ .transitionRequestedToInflight(
+ HoodieTimeline.REPLACE_COMMIT_ACTION,
+ commitTime);
- // add metadata sync checkpoint info
- tgtCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
- commitMetadataInBytes =
Option.of(tgtCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
- } else if
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ HoodieInstant instant = new HoodieInstant(true,
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime);
+ try {
+ txnManager.beginTransaction(Option.of(instant), Option.empty());
+ SparkHoodieBackedMetadataSyncMetadataWriter metadataWriter =
+ (SparkHoodieBackedMetadataSyncMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get();
+
metadataWriter.bootstrap(sourceLastInstant.map(HoodieInstant::getTimestamp));
+ } finally {
+ txnManager.endTransaction(Option.of(instant));
+ }
+ Option<HoodieInstant> targetTableLastInstant =
targetTableMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+ SyncMetadata syncMetadata =
getTableSyncExtraMetadata(targetTableLastInstant, targetTableMetaClient,
+ cfg.sourceBasePath, sourceLastInstant.get().getTimestamp(),
getPendingInstants(sourceTableMetaClient.getActiveTimeline(),
sourceLastInstant.get()));
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
buildComprehensiveReplaceCommitMetadata(sourceTableMetaClient, schema);
+ replaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
+ targetTableMetaClient
+ .reloadActiveTimeline()
+ .saveAsComplete(new HoodieInstant(true,
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime),
+
Option.of(replaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ }
- HoodieReplaceCommitMetadata srcReplaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
-
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
- HoodieReplaceCommitMetadata tgtReplaceCommitMetadata =
buildReplaceCommitMetadata(srcReplaceCommitMetadata, commitTime);
+ private HoodieReplaceCommitMetadata
buildComprehensiveReplaceCommitMetadata(HoodieTableMetaClient
sourceTableMetaClient, Schema schema) {
+ List<HoodieInstant> replaceCommits =
sourceTableMetaClient.getActiveTimeline().filterCompletedInstants().getInstants().stream()
+ .filter(instant ->
instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).collect(Collectors.toList());
+
+ HoodieReplaceCommitMetadata replaceCommitMetadata = new
HoodieReplaceCommitMetadata();
+ Map<String, List<String>> totalPartitionToReplacedFiles = new HashMap<>();
+ replaceCommits.forEach(replaceCommit -> {
+ try {
+ HoodieReplaceCommitMetadata commitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
+
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(replaceCommit).get(),
HoodieReplaceCommitMetadata.class);
+ Map<String, List<String>> partitionsToReplacedFileGroups =
commitMetadata.getPartitionToReplaceFileIds();
+ for (Map.Entry<String, List<String>> entry :
partitionsToReplacedFileGroups.entrySet()) {
+ String partition = entry.getKey();
+ List<String> replacedFiles = entry.getValue();
+ totalPartitionToReplacedFiles.computeIfAbsent(partition, k -> new
ArrayList<>());
+ totalPartitionToReplacedFiles.get(partition).addAll(replacedFiles);
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Failed to deserialize instant " +
replaceCommit.getTimestamp(), e);
+ }
+ });
- try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
- (HoodieTableMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get()) {
- hoodieTableMetadataWriter.update(tgtReplaceCommitMetadata,
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
- }
- // add metadata sync checkpoint info
-
tgtReplaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
- commitMetadataInBytes =
Option.of(tgtReplaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
- } else if (instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)) {
- HoodieCleanMetadata srcCleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
-
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get());
- HoodieCleanMetadata tgtCleanMetadata =
reconstructHoodieCleanCommitMetadata(srcCleanMetadata,
- writeConfig, hoodieSparkEngineContext, targetTableMetaClient);
- //HoodieCleanMetadata tgtCleanMetadata =
buildHoodieCleanMetadata(srcCleanMetadata, commitTime);
- try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
- (HoodieTableMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get()) {
- hoodieTableMetadataWriter.update(tgtCleanMetadata, commitTime);
- }
+
replaceCommitMetadata.setPartitionToReplaceFileIds(totalPartitionToReplacedFiles);
- commitMetadataInBytes =
TimelineMetadataUtils.serializeCleanMetadata(tgtCleanMetadata);
- }
+ replaceCommitMetadata.addMetadata("schema", schema.toString());
+ replaceCommitMetadata.setOperationType(WriteOperationType.BOOTSTRAP);
+ replaceCommitMetadata.setCompacted(false);
+ return replaceCommitMetadata;
+ }
- targetTableMetaClient
- .reloadActiveTimeline()
- .saveAsComplete(new HoodieInstant(true, instant.getAction(),
commitTime), commitMetadataInBytes);
- }
+ private void runArchiver(
Review Comment:
don't we need a lock here?
what if two concurrent writer tries to archive the target table concurrently
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]