vinothchandar commented on a change in pull request #3590:
URL: https://github.com/apache/hudi/pull/3590#discussion_r716227697
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -88,6 +91,7 @@
protected HoodieBackedTableMetadata metadata;
protected HoodieTableMetaClient metaClient;
+ protected HoodieTableMetaClient datasetMetaClient;
Review comment:
rename: dataMetaClient
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -121,8 +118,13 @@ public boolean commit(String instantTime,
List<WriteStatus> writeStatuses, Optio
}
@Override
- protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> createTable(HoodieWriteConfig config, Configuration
hadoopConf) {
- return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+ protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> createTable(HoodieWriteConfig config, Configuration
hadoopConf,
Review comment:
@danny0405 @yanghua @leesf can one of you please review the flink
integration pieces and make a suggestion for how we can make Flink write out to
metadata table seamlessly as well. It could happen in a different PR. but like
to treat Spark and Flink (eventually Java) as same going forward. i.e there
large features are done hand-in-hand
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +394,83 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
/**
- * Sync the Metadata Table from the instants created on the dataset.
+ * Initialize file groups for a partition. For file listing, we just have
one file group.
*
- * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+ * All FileGroups for a given metadata partition has a fixed prefix as per
the {@link MetadataPartitionType#getFileIdPrefix()}.
+ * Each file group is suffixed with increments of 1 starting with 1.
Review comment:
why not start with 0? do you want specifically `1` for some reason
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +394,83 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
/**
- * Sync the Metadata Table from the instants created on the dataset.
+ * Initialize file groups for a partition. For file listing, we just have
one file group.
*
- * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+ * All FileGroups for a given metadata partition has a fixed prefix as per
the {@link MetadataPartitionType#getFileIdPrefix()}.
+ * Each file group is suffixed with increments of 1 starting with 1.
+ *
+ * For instance, for FILES, there is only one file group named as "files-1"
+ * Lets say we configure 10 file groups for record level index, and prefix
as "record-index-bucket-"
+ * Filegroups will be named as :
+ * record-index-bucket-01
+ * record-index-bucket-02
+ * ...
+ * record-index-bucket-10
*/
- private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it
is not enabled");
- // (re) init the metadata for reading.
- initTableMetadata();
- try {
- List<HoodieInstant> instantsToSync =
metadata.findInstantsToSyncForWriter();
- if (instantsToSync.isEmpty()) {
- return;
- }
-
- LOG.info("Syncing " + instantsToSync.size() + " instants to metadata
table: " + instantsToSync);
-
- // Read each instant in order and sync it to metadata table
- for (HoodieInstant instant : instantsToSync) {
- LOG.info("Syncing instant " + instant + " to metadata table");
-
- Option<List<HoodieRecord>> records =
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient,
- metaClient.getActiveTimeline(), instant, metadata.getUpdateTime());
- if (records.isPresent()) {
- commit(records.get(), MetadataPartitionType.FILES.partitionPath(),
instant.getTimestamp());
- }
+ private void initializeFileGroups(HoodieTableMetaClient datasetMetaClient,
MetadataPartitionType metadataPartition, String instantTime,
+ int fileGroupCount) throws IOException {
+
+ final HashMap<HeaderMetadataType, String> blockHeader = new HashMap<>();
+ blockHeader.put(HeaderMetadataType.INSTANT_TIME, instantTime);
+ // Archival of data table has a dependency on compaction(base files) in
metadata table.
+ // It is assumed that as of time Tx of base instant (/compaction time) in
metadata table,
+ // all commits in data table is in sync with metadata table. So, we always
create start with log file for any fileGroup.
+ final HoodieDeleteBlock block = new HoodieDeleteBlock(new HoodieKey[0],
blockHeader);
Review comment:
this is pretty hacky. :D
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -389,7 +401,19 @@ private void
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
+ config.getBasePath() + " at time " + clusteringCommitTime, e);
}
}
- LOG.info("Clustering successfully on commit " + clusteringCommitTime);
+ LOG.warn("Clustering successfully on commit " + clusteringCommitTime);
+ }
+
+ private void writeTableMetadata(HoodieTable<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieCommitMetadata
commitMetadata, String commitTime) {
+ try {
+ this.txnManager.beginTransaction(Option.of(new
HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime)),
Review comment:
this method name/params don't indicate it working on clustering at all.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -96,6 +94,11 @@ public SparkRDDWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeC
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig
writeConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
+ if (config.isMetadataTableEnabled()) {
+ // If the metadata table does not exist, it should be bootstrapped here
Review comment:
is this bootstrap multi writer safe?
two scenarios to consider
a) two writers (i.e delta sync and a spark job) trying to do this.
b) Just a single writer - but with async compaction/clustering (who would
also create write clients).
I am not sure if metadata being enabled is the only flag to check here.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -314,12 +323,13 @@ protected void completeCompaction(HoodieCommitMetadata
metadata, JavaRDD<WriteSt
+ config.getBasePath() + " at time " + compactionCommitTime, e);
}
}
- LOG.info("Compacted successfully on commit " + compactionCommitTime);
+ LOG.warn("Compacted successfully on commit " + compactionCommitTime);
Review comment:
why would this be a warn?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -66,4 +77,34 @@ protected HoodieSparkTable(HoodieWriteConfig config,
HoodieEngineContext context
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext
context) {
return SparkHoodieIndex.createIndex(config);
}
+
+ /**
+ * Fetch instance of {@link HoodieTableMetadataWriter}.
+ *
+ * @return instance of {@link HoodieTableMetadataWriter}
+ */
+ @Override
+ public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+ synchronized (this) {
+ if (!isMetadataInfoUpdated.getAndSet(true)) {
+ // this code assumes that bootstrap of metadata table is done
elsewhere (SparkRDDWriteClient) and so, we gauge whether metadata table
+ // is available or not once here and reuse the same info for repeated
calls to getMetadataWriter(). Please remove memoization if
+ // that's not the case.
+ try {
+ if (!config.isMetadataTableEnabled() ||
!metaClient.getFs().exists(new
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
+ isMetadataTableAvailable = false;
+ } else {
+ isMetadataTableAvailable = true;
+ }
+ } catch (IOException e) {
+ throw new HoodieMetadataException("Could not instantiate metadata
table writer", e);
+ }
+ }
+ }
+ if (isMetadataTableAvailable) {
Review comment:
something without the if . (general code style issue, that I wonder a
lot abt to get rid of these if blocks with option).
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -99,83 +94,94 @@ protected void initialize(HoodieEngineContext
engineContext, HoodieTableMetaClie
@Override
protected void commit(List<HoodieRecord> records, String partitionName,
String instantTime) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to
as it is not enabled");
- JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName);
+ JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
- writeClient.startCommitWithTime(instantTime);
+ if
(!metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime))
{
+ // if this is a new commit being applied to metadata for the first time
+ writeClient.startCommitWithTime(instantTime);
+ } else {
+ // this code path refers to a re-attempted commit that got committed
to metadata, but failed in dataset.
+ // for eg, lets say compaction c1 on 1st attempt succeeded in metadata
table and failed before committing to datatable.
+ // when retried again, data table will first rollback pending
compaction. these will be applied to metadata table, but all changes
+ // are upserts to metadata table and so only a new delta commit will
be created.
+ // once rollback is complete, compaction will be retried again, which
will eventually hit this code block where the respective commit is
+ // already part of completed commit. So, we have to manually remove
the completed instant and proceed.
+ // and it is for the same reason we enabled
withAllowMultiWriteOnSameInstant for metadata table.
+ HoodieInstant alreadyCompletedInstant =
metaClient.getActiveTimeline().filterCompletedInstants().filter(entry ->
entry.getTimestamp().equals(instantTime)).lastInstant().get();
+ FSUtils.deleteInstantFile(metaClient.getFs(),
metaClient.getMetaPath(), alreadyCompletedInstant);
+ metaClient.reloadActiveTimeline();
+ }
List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD,
instantTime).collect();
statuses.forEach(writeStatus -> {
if (writeStatus.hasErrors()) {
throw new HoodieMetadataException("Failed to commit metadata table
records at instant " + instantTime);
}
});
- // trigger cleaning, compaction, with suffixes based on the same instant
time. This ensures that any future
- // delta commits synced over will not have an instant time lesser than
the last completed instant on the
- // metadata table.
- if (writeClient.scheduleCompactionAtInstant(instantTime + "001",
Option.empty())) {
- writeClient.compact(instantTime + "001");
- }
- writeClient.clean(instantTime + "002");
+
+ // reload timeline
+ metaClient.reloadActiveTimeline();
+
+ compactIfNecessary(writeClient, instantTime);
+ doClean(writeClient, instantTime);
}
// Update total size of the metadata and count of base/log files
- metrics.ifPresent(m -> {
- try {
- Map<String, String> stats = m.getStats(false, metaClient, metadata);
-
m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)),
-
Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)),
-
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)),
-
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES)));
- } catch (HoodieIOException e) {
- LOG.error("Could not publish metadata size metrics", e);
- }
- });
+ metrics.ifPresent(m -> m.updateSizeMetrics(metaClient, metadata));
}
/**
- * Tag each record with the location.
+ * Perform a compaction on the Metadata Table.
+ *
+ * Cases to be handled:
+ * 1. We cannot perform compaction if there are previous inflight
operations on the dataset. This is because
+ * a compacted metadata base file at time Tx should represent all the
actions on the dataset till time Tx.
*
- * Since we only read the latest base file in a partition, we tag the
records with the instant time of the latest
- * base file.
+ * 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
+ * deltacommit.
*/
- private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String
partitionName) {
- HoodieTable table = HoodieSparkTable.create(metadataWriteConfig,
engineContext);
- TableFileSystemView.SliceView fsView = table.getSliceView();
- List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
- .map(FileSlice::getBaseFile)
- .filter(Option::isPresent)
- .map(Option::get)
- .collect(Collectors.toList());
-
- // All the metadata fits within a single base file
- if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
- if (baseFiles.size() > 1) {
- throw new HoodieMetadataException("Multiple base files found in
metadata partition");
- }
+ private void compactIfNecessary(SparkRDDWriteClient writeClient, String
instantTime) {
+ String latestDeltacommitTime =
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
+ .get().getTimestamp();
+ List<HoodieInstant> pendingInstants =
datasetMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+
.findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
+
+ if (!pendingInstants.isEmpty()) {
+ LOG.info(String.format("Cannot compact metadata table as there are %d
inflight instants before latest deltacommit %s: %s",
+ pendingInstants.size(), latestDeltacommitTime,
Arrays.toString(pendingInstants.toArray())));
+ return;
}
- JavaSparkContext jsc = ((HoodieSparkEngineContext)
engineContext).getJavaSparkContext();
- String fileId;
- String instantTime;
- if (!baseFiles.isEmpty()) {
- fileId = baseFiles.get(0).getFileId();
- instantTime = baseFiles.get(0).getCommitTime();
- } else {
- // If there is a log file then we can assume that it has the data
- List<HoodieLogFile> logFiles =
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
- .map(FileSlice::getLatestLogFile)
- .filter(Option::isPresent)
- .map(Option::get)
- .collect(Collectors.toList());
- if (logFiles.isEmpty()) {
- // No base and log files. All are new inserts
- return jsc.parallelize(records, 1);
- }
-
- fileId = logFiles.get(0).getFileId();
- instantTime = logFiles.get(0).getBaseCommitTime();
+ // Trigger compaction with suffixes based on the same instant time. This
ensures that any future
+ // delta commits synced over will not have an instant time lesser than the
last completed instant on the
+ // metadata table.
+ final String compactionInstantTime = latestDeltacommitTime + "001";
+ if (writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
Review comment:
metadataWriteClient?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -314,12 +323,13 @@ protected void completeCompaction(HoodieCommitMetadata
metadata, JavaRDD<WriteSt
+ config.getBasePath() + " at time " + compactionCommitTime, e);
}
}
- LOG.info("Compacted successfully on commit " + compactionCommitTime);
+ LOG.warn("Compacted successfully on commit " + compactionCommitTime);
}
@Override
protected JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean
shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
+ table.getHoodieView().sync();
Review comment:
I would like to avoid these `sync()` calls being added everywhere. So
far, the writeClient gets "synced" when its created and that's that. Every
preWrite sync-ing is more prediciatble, so down with that. but these kind of
calls added to each API, could make our job difficult in maintaining.
Also why would this be not done at the super class level? for Flink, java as
well?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -66,4 +77,34 @@ protected HoodieSparkTable(HoodieWriteConfig config,
HoodieEngineContext context
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext
context) {
return SparkHoodieIndex.createIndex(config);
}
+
+ /**
+ * Fetch instance of {@link HoodieTableMetadataWriter}.
+ *
+ * @return instance of {@link HoodieTableMetadataWriter}
+ */
+ @Override
+ public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+ synchronized (this) {
+ if (!isMetadataInfoUpdated.getAndSet(true)) {
+ // this code assumes that bootstrap of metadata table is done
elsewhere (SparkRDDWriteClient) and so, we gauge whether metadata table
+ // is available or not once here and reuse the same info for repeated
calls to getMetadataWriter(). Please remove memoization if
+ // that's not the case.
+ try {
+ if (!config.isMetadataTableEnabled() ||
!metaClient.getFs().exists(new
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
Review comment:
`isMetadataTableAvailable = config.isMetadataTableEnabled() &&
metaClient.getFs().exists(new
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())` ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +394,83 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
/**
- * Sync the Metadata Table from the instants created on the dataset.
+ * Initialize file groups for a partition. For file listing, we just have
one file group.
*
- * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+ * All FileGroups for a given metadata partition has a fixed prefix as per
the {@link MetadataPartitionType#getFileIdPrefix()}.
+ * Each file group is suffixed with increments of 1 starting with 1.
+ *
+ * For instance, for FILES, there is only one file group named as "files-1"
+ * Lets say we configure 10 file groups for record level index, and prefix
as "record-index-bucket-"
Review comment:
it may be okay to pad upto 3 zeros for these file groups by default
during naming. keep them all consistent in most cases. leave it to you.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -99,83 +94,94 @@ protected void initialize(HoodieEngineContext
engineContext, HoodieTableMetaClie
@Override
protected void commit(List<HoodieRecord> records, String partitionName,
String instantTime) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to
as it is not enabled");
- JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName);
+ JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
- writeClient.startCommitWithTime(instantTime);
+ if
(!metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime))
{
+ // if this is a new commit being applied to metadata for the first time
+ writeClient.startCommitWithTime(instantTime);
+ } else {
+ // this code path refers to a re-attempted commit that got committed
to metadata, but failed in dataset.
+ // for eg, lets say compaction c1 on 1st attempt succeeded in metadata
table and failed before committing to datatable.
+ // when retried again, data table will first rollback pending
compaction. these will be applied to metadata table, but all changes
+ // are upserts to metadata table and so only a new delta commit will
be created.
+ // once rollback is complete, compaction will be retried again, which
will eventually hit this code block where the respective commit is
+ // already part of completed commit. So, we have to manually remove
the completed instant and proceed.
+ // and it is for the same reason we enabled
withAllowMultiWriteOnSameInstant for metadata table.
+ HoodieInstant alreadyCompletedInstant =
metaClient.getActiveTimeline().filterCompletedInstants().filter(entry ->
entry.getTimestamp().equals(instantTime)).lastInstant().get();
+ FSUtils.deleteInstantFile(metaClient.getFs(),
metaClient.getMetaPath(), alreadyCompletedInstant);
+ metaClient.reloadActiveTimeline();
+ }
List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD,
instantTime).collect();
statuses.forEach(writeStatus -> {
if (writeStatus.hasErrors()) {
throw new HoodieMetadataException("Failed to commit metadata table
records at instant " + instantTime);
}
});
- // trigger cleaning, compaction, with suffixes based on the same instant
time. This ensures that any future
- // delta commits synced over will not have an instant time lesser than
the last completed instant on the
- // metadata table.
- if (writeClient.scheduleCompactionAtInstant(instantTime + "001",
Option.empty())) {
- writeClient.compact(instantTime + "001");
- }
- writeClient.clean(instantTime + "002");
+
+ // reload timeline
+ metaClient.reloadActiveTimeline();
+
+ compactIfNecessary(writeClient, instantTime);
+ doClean(writeClient, instantTime);
}
// Update total size of the metadata and count of base/log files
- metrics.ifPresent(m -> {
- try {
- Map<String, String> stats = m.getStats(false, metaClient, metadata);
-
m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)),
-
Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)),
-
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)),
-
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES)));
- } catch (HoodieIOException e) {
- LOG.error("Could not publish metadata size metrics", e);
- }
- });
+ metrics.ifPresent(m -> m.updateSizeMetrics(metaClient, metadata));
}
/**
- * Tag each record with the location.
+ * Perform a compaction on the Metadata Table.
+ *
+ * Cases to be handled:
+ * 1. We cannot perform compaction if there are previous inflight
operations on the dataset. This is because
+ * a compacted metadata base file at time Tx should represent all the
actions on the dataset till time Tx.
*
- * Since we only read the latest base file in a partition, we tag the
records with the instant time of the latest
- * base file.
+ * 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
+ * deltacommit.
*/
- private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String
partitionName) {
- HoodieTable table = HoodieSparkTable.create(metadataWriteConfig,
engineContext);
- TableFileSystemView.SliceView fsView = table.getSliceView();
- List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
- .map(FileSlice::getBaseFile)
- .filter(Option::isPresent)
- .map(Option::get)
- .collect(Collectors.toList());
-
- // All the metadata fits within a single base file
- if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
- if (baseFiles.size() > 1) {
- throw new HoodieMetadataException("Multiple base files found in
metadata partition");
- }
+ private void compactIfNecessary(SparkRDDWriteClient writeClient, String
instantTime) {
+ String latestDeltacommitTime =
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
+ .get().getTimestamp();
+ List<HoodieInstant> pendingInstants =
datasetMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
Review comment:
lets review naming for clarity across the board. lets name everything
using `data` and `metadata` prefixes.
`dataMetaClient` `metadataMetaClient`
`dataPendingInstants`
`metadataLatestDeltaCommitTime` and so forth.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -66,4 +77,34 @@ protected HoodieSparkTable(HoodieWriteConfig config,
HoodieEngineContext context
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext
context) {
return SparkHoodieIndex.createIndex(config);
}
+
+ /**
+ * Fetch instance of {@link HoodieTableMetadataWriter}.
+ *
+ * @return instance of {@link HoodieTableMetadataWriter}
+ */
+ @Override
+ public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+ synchronized (this) {
+ if (!isMetadataInfoUpdated.getAndSet(true)) {
+ // this code assumes that bootstrap of metadata table is done
elsewhere (SparkRDDWriteClient) and so, we gauge whether metadata table
+ // is available or not once here and reuse the same info for repeated
calls to getMetadataWriter(). Please remove memoization if
Review comment:
lets get rid of this commentary. :)
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -66,4 +77,34 @@ protected HoodieSparkTable(HoodieWriteConfig config,
HoodieEngineContext context
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext
context) {
return SparkHoodieIndex.createIndex(config);
}
+
+ /**
+ * Fetch instance of {@link HoodieTableMetadataWriter}.
+ *
+ * @return instance of {@link HoodieTableMetadataWriter}
+ */
+ @Override
+ public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+ synchronized (this) {
+ if (!isMetadataInfoUpdated.getAndSet(true)) {
+ // this code assumes that bootstrap of metadata table is done
elsewhere (SparkRDDWriteClient) and so, we gauge whether metadata table
+ // is available or not once here and reuse the same info for repeated
calls to getMetadataWriter(). Please remove memoization if
+ // that's not the case.
+ try {
+ if (!config.isMetadataTableEnabled() ||
!metaClient.getFs().exists(new
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
+ isMetadataTableAvailable = false;
+ } else {
+ isMetadataTableAvailable = true;
+ }
+ } catch (IOException e) {
+ throw new HoodieMetadataException("Could not instantiate metadata
table writer", e);
Review comment:
this error message is misleading. all we are doing here is checking for
existence?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -66,4 +77,34 @@ protected HoodieSparkTable(HoodieWriteConfig config,
HoodieEngineContext context
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext
context) {
return SparkHoodieIndex.createIndex(config);
}
+
+ /**
+ * Fetch instance of {@link HoodieTableMetadataWriter}.
+ *
+ * @return instance of {@link HoodieTableMetadataWriter}
+ */
+ @Override
+ public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+ synchronized (this) {
+ if (!isMetadataInfoUpdated.getAndSet(true)) {
+ // this code assumes that bootstrap of metadata table is done
elsewhere (SparkRDDWriteClient) and so, we gauge whether metadata table
+ // is available or not once here and reuse the same info for repeated
calls to getMetadataWriter(). Please remove memoization if
+ // that's not the case.
+ try {
+ if (!config.isMetadataTableEnabled() ||
!metaClient.getFs().exists(new
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
+ isMetadataTableAvailable = false;
+ } else {
+ isMetadataTableAvailable = true;
+ }
+ } catch (IOException e) {
+ throw new HoodieMetadataException("Could not instantiate metadata
table writer", e);
+ }
+ }
+ }
+ if (isMetadataTableAvailable) {
Review comment:
`Option.of(isMetadataTableAvailable).filter(p -> p).map(p ->
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(),
config, context))`?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
##########
@@ -56,6 +56,8 @@
String UNKNOWN_WRITE_TOKEN = "1-0-1";
+ String DEFAULT_WRITE_TOKEN_METADATA_PARTITION = "0-0-0";
Review comment:
Lets not please ever refer to some higher level functionality/stack i.e
metadata in a lower level class. or member names/comments.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -99,83 +94,94 @@ protected void initialize(HoodieEngineContext
engineContext, HoodieTableMetaClie
@Override
protected void commit(List<HoodieRecord> records, String partitionName,
String instantTime) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to
as it is not enabled");
- JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName);
+ JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
- writeClient.startCommitWithTime(instantTime);
+ if
(!metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime))
{
+ // if this is a new commit being applied to metadata for the first time
+ writeClient.startCommitWithTime(instantTime);
+ } else {
+ // this code path refers to a re-attempted commit that got committed
to metadata, but failed in dataset.
+ // for eg, lets say compaction c1 on 1st attempt succeeded in metadata
table and failed before committing to datatable.
+ // when retried again, data table will first rollback pending
compaction. these will be applied to metadata table, but all changes
+ // are upserts to metadata table and so only a new delta commit will
be created.
+ // once rollback is complete, compaction will be retried again, which
will eventually hit this code block where the respective commit is
+ // already part of completed commit. So, we have to manually remove
the completed instant and proceed.
+ // and it is for the same reason we enabled
withAllowMultiWriteOnSameInstant for metadata table.
+ HoodieInstant alreadyCompletedInstant =
metaClient.getActiveTimeline().filterCompletedInstants().filter(entry ->
entry.getTimestamp().equals(instantTime)).lastInstant().get();
+ FSUtils.deleteInstantFile(metaClient.getFs(),
metaClient.getMetaPath(), alreadyCompletedInstant);
+ metaClient.reloadActiveTimeline();
+ }
List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD,
instantTime).collect();
statuses.forEach(writeStatus -> {
if (writeStatus.hasErrors()) {
throw new HoodieMetadataException("Failed to commit metadata table
records at instant " + instantTime);
}
});
- // trigger cleaning, compaction, with suffixes based on the same instant
time. This ensures that any future
- // delta commits synced over will not have an instant time lesser than
the last completed instant on the
- // metadata table.
- if (writeClient.scheduleCompactionAtInstant(instantTime + "001",
Option.empty())) {
- writeClient.compact(instantTime + "001");
- }
- writeClient.clean(instantTime + "002");
+
+ // reload timeline
+ metaClient.reloadActiveTimeline();
+
+ compactIfNecessary(writeClient, instantTime);
+ doClean(writeClient, instantTime);
}
// Update total size of the metadata and count of base/log files
- metrics.ifPresent(m -> {
- try {
- Map<String, String> stats = m.getStats(false, metaClient, metadata);
-
m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)),
-
Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)),
-
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)),
-
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES)));
- } catch (HoodieIOException e) {
- LOG.error("Could not publish metadata size metrics", e);
- }
- });
+ metrics.ifPresent(m -> m.updateSizeMetrics(metaClient, metadata));
}
/**
- * Tag each record with the location.
+ * Perform a compaction on the Metadata Table.
+ *
+ * Cases to be handled:
+ * 1. We cannot perform compaction if there are previous inflight
operations on the dataset. This is because
+ * a compacted metadata base file at time Tx should represent all the
actions on the dataset till time Tx.
*
- * Since we only read the latest base file in a partition, we tag the
records with the instant time of the latest
- * base file.
+ * 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
+ * deltacommit.
*/
- private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String
partitionName) {
- HoodieTable table = HoodieSparkTable.create(metadataWriteConfig,
engineContext);
- TableFileSystemView.SliceView fsView = table.getSliceView();
- List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
- .map(FileSlice::getBaseFile)
- .filter(Option::isPresent)
- .map(Option::get)
- .collect(Collectors.toList());
-
- // All the metadata fits within a single base file
- if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
- if (baseFiles.size() > 1) {
- throw new HoodieMetadataException("Multiple base files found in
metadata partition");
- }
+ private void compactIfNecessary(SparkRDDWriteClient writeClient, String
instantTime) {
+ String latestDeltacommitTime =
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
+ .get().getTimestamp();
+ List<HoodieInstant> pendingInstants =
datasetMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+
.findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
+
+ if (!pendingInstants.isEmpty()) {
+ LOG.info(String.format("Cannot compact metadata table as there are %d
inflight instants before latest deltacommit %s: %s",
+ pendingInstants.size(), latestDeltacommitTime,
Arrays.toString(pendingInstants.toArray())));
+ return;
}
- JavaSparkContext jsc = ((HoodieSparkEngineContext)
engineContext).getJavaSparkContext();
- String fileId;
- String instantTime;
- if (!baseFiles.isEmpty()) {
- fileId = baseFiles.get(0).getFileId();
- instantTime = baseFiles.get(0).getCommitTime();
- } else {
- // If there is a log file then we can assume that it has the data
- List<HoodieLogFile> logFiles =
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
- .map(FileSlice::getLatestLogFile)
- .filter(Option::isPresent)
- .map(Option::get)
- .collect(Collectors.toList());
- if (logFiles.isEmpty()) {
- // No base and log files. All are new inserts
- return jsc.parallelize(records, 1);
- }
-
- fileId = logFiles.get(0).getFileId();
- instantTime = logFiles.get(0).getBaseCommitTime();
+ // Trigger compaction with suffixes based on the same instant time. This
ensures that any future
+ // delta commits synced over will not have an instant time lesser than the
last completed instant on the
+ // metadata table.
+ final String compactionInstantTime = latestDeltacommitTime + "001";
+ if (writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
+ writeClient.compact(compactionInstantTime);
}
+ }
- return jsc.parallelize(records, 1).map(r -> r.setCurrentLocation(new
HoodieRecordLocation(instantTime, fileId)));
+ private void doClean(SparkRDDWriteClient writeClient, String instantTime) {
+ // Trigger cleaning with suffixes based on the same instant time. This
ensures that any future
+ // delta commits synced over will not have an instant time lesser than the
last completed instant on the
+ // metadata table.
+ writeClient.clean(instantTime + "002");
+ }
+
+ /**
+ * Tag each record with the location in the given partition.
+ *
+ * The record is tagged with respective bucket's file location based on its
record key.
+ */
+ private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String
partitionName, int bucketCount) {
+ List<FileSlice> buckets =
HoodieTableMetadataUtil.loadPartitionBucketsWithLatestFileSlices(metaClient,
partitionName);
Review comment:
rename? all its doing is fetching fileSlices. no bucketing terms
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -188,6 +189,7 @@ public boolean commitStats(String instantTime,
List<HoodieWriteStat> stats, Opti
lastCompletedTxnAndMetadata.isPresent() ?
Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
preCommit(instantTime, metadata);
+ table.getMetadataWriter().ifPresent(w ->
((HoodieTableMetadataWriter)w).update(metadata, instantTime));
Review comment:
I still think this this we should call this from Spark's `preCommit()`.
we are supposed to have all code that needs to run before committing there -
precommit is not meant for conflict resolution, its the other way around.
Conflict resolution had to be put there, just like how metadata writing should
also go there. Introducing these one-offs just makes it hard to maintain over
longer term.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]