vinothchandar commented on a change in pull request #3590:
URL: https://github.com/apache/hudi/pull/3590#discussion_r715912832
##########
File path:
hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
##########
@@ -440,14 +440,6 @@ public String compareCommits(@CliOption(key = {"path"},
help = "Path of the tabl
}
}
- @CliCommand(value = "commits sync", help = "Compare commits with another
Hoodie table")
- public String syncCommits(@CliOption(key = {"path"}, help = "Path of the
table to compare to") final String path) {
- HoodieCLI.syncTableMetadata =
HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build();
Review comment:
I understand why this is being removed. but we may want some ability to
add a new metadata partition in the background, using say `CREATE INDEX`
statement, while this is built in the background, there could be new commits ,
which need to be synced? lets file a subtask to deal with this.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -188,6 +189,7 @@ public boolean commitStats(String instantTime,
List<HoodieWriteStat> stats, Opti
lastCompletedTxnAndMetadata.isPresent() ?
Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
preCommit(instantTime, metadata);
+ table.getMetadataWriter().ifPresent(w ->
((HoodieTableMetadataWriter)w).update(metadata, instantTime));
Review comment:
why can't this be inside `preCommit()`?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -273,6 +275,7 @@ public void bootstrap(Option<Map<String, String>>
extraMetadata) {
public void rollbackFailedBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
+ table.getHoodieView().sync();
Review comment:
need to remove?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -81,13 +84,16 @@
public abstract class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWriter {
private static final Logger LOG =
LogManager.getLogger(HoodieBackedTableMetadataWriter.class);
+ private static final Integer MAX_BUCKET_COUNT = 9999;
+ private static final String BUCKET_PREFIX = "bucket-";
Review comment:
instead of `bucket-` , what if we use the partition name, i.e
`files-xxxx`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -309,6 +303,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext
engineContext, Hoodi
.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
initTableMetadata();
+ initializeBuckets(datasetMetaClient,
MetadataPartitionType.FILES.partitionPath(), createInstantTime, 1);
Review comment:
initializeFileGroups?
##########
File path:
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
##########
@@ -71,6 +72,7 @@ public void init() throws Exception {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2,
2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
3).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
Review comment:
lets track all these things ? and eventually fix these
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -298,28 +306,38 @@ protected void completeCompaction(HoodieCommitMetadata
metadata, JavaRDD<WriteSt
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction");
List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
- finalizeWrite(table, compactionCommitTime, writeStats);
- LOG.info("Committing Compaction " + compactionCommitTime + ". Finished
with result " + metadata);
- SparkCompactHelpers.newInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
- WriteMarkersFactory.get(config.getMarkersType(), table,
compactionCommitTime)
- .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-
- if (compactionTimer != null) {
- long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
- try {
-
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
- durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
- } catch (ParseException e) {
- throw new HoodieCommitException("Commit time is not of valid format.
Failed to commit compaction "
- + config.getBasePath() + " at time " + compactionCommitTime, e);
+ try {
+ // TODO: check if we need
HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient,
config) here.
+ this.txnManager.beginTransaction(Option.of(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
compactionCommitTime)), Option.empty());
+ // Do not do any conflict resolution here as we do with regular writes.
We take the lock here to ensure all writes to metadata table happens within a
+ // single lock (single writer). Because more than one write to metadata
table will result in conflicts since all of them updates the same partition.
+ table.getMetadataWriter().ifPresent(w -> w.update(metadata,
compactionCommitTime));
+ // commit to data table after committing to metadata
table.FlinkHoodieBackedTableMetadataWriter
Review comment:
mentioning Flink in Spark module?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -499,8 +510,20 @@ public void update(HoodieRestoreMetadata restoreMetadata,
String instantTime) {
@Override
public void update(HoodieRollbackMetadata rollbackMetadata, String
instantTime) {
if (enabled) {
- List<HoodieRecord> records =
HoodieTableMetadataUtil.convertMetadataToRecords(metaClient.getActiveTimeline(),
- rollbackMetadata, instantTime, metadata.getUpdateTime());
+ // Is this rollback of an instant that has been synced to the metadata
table?
+ String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0);
Review comment:
this is something to fix. this `get(0)`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
##########
@@ -124,6 +127,19 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K,
O> table, HoodieInstan
}
}
+ /**
+ * Update metadata table if available. Any update to metadata table happens
within data table lock.
+ * @param cleanMetadata intance of {@link HoodieCleanMetadata} to be applied
to metadata.
+ */
+ private void writeToMetadata(HoodieCleanMetadata cleanMetadata) {
Review comment:
writeMetadata
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +396,86 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
/**
- * Sync the Metadata Table from the instants created on the dataset.
+ * Initialize buckets for a partition. For file listing, we just have one
bucket. But for record level index, we might have N number of buckets
+ * per partition. Technically speaking buckets here map to FileGroups in
Hudi.
*
- * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+ * Each bucket maps to FileGroups in Hudi and is represented with the
following format:
+ * bucket-ABCD
+ * where ABCD are digits. This allows up to 9999 buckets.
+ *
+ * Example:
+ * bucket-0001
+ * bucket-0002
*/
- private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it
is not enabled");
- // (re) init the metadata for reading.
- initTableMetadata();
- try {
- List<HoodieInstant> instantsToSync =
metadata.findInstantsToSyncForWriter();
- if (instantsToSync.isEmpty()) {
- return;
- }
-
- LOG.info("Syncing " + instantsToSync.size() + " instants to metadata
table: " + instantsToSync);
-
- // Read each instant in order and sync it to metadata table
- for (HoodieInstant instant : instantsToSync) {
- LOG.info("Syncing instant " + instant + " to metadata table");
-
- Option<List<HoodieRecord>> records =
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient,
- metaClient.getActiveTimeline(), instant, metadata.getUpdateTime());
- if (records.isPresent()) {
- commit(records.get(), MetadataPartitionType.FILES.partitionPath(),
instant.getTimestamp());
- }
+ private void initializeBuckets(HoodieTableMetaClient datasetMetaClient,
String partition, String instantTime,
+ int bucketCount) throws IOException {
+ ValidationUtils.checkArgument(bucketCount <= MAX_BUCKET_COUNT, "Maximum "
+ MAX_BUCKET_COUNT + " buckets are supported.");
+
+ final HashMap<HeaderMetadataType, String> blockHeader = new HashMap<>();
+ blockHeader.put(HeaderMetadataType.INSTANT_TIME, instantTime);
+ // Archival of data table has a dependency on compaction(base files) in
metadata table.
+ // It is assumed that as of time Tx of base instant (/compaction time) in
metadata table,
+ // all commits in data table is in sync with metadata table. So, we always
create start with log file for any bucket.
+ // but we have to work on relaxing that in future :
https://issues.apache.org/jira/browse/HUDI-2458
+ final HoodieDeleteBlock block = new HoodieDeleteBlock(new HoodieKey[0],
blockHeader);
+
+ LOG.info(String.format("Creating %d buckets for partition %s with base
fileId %s at instant time %s",
+ bucketCount, partition, BUCKET_PREFIX, instantTime));
+ for (int i = 0; i < bucketCount; ++i) {
+ final String bucketFileId = String.format("%s%04d", BUCKET_PREFIX, i +
1);
+ try {
+ // since all shards are initialized in driver, we don't need to create
a random write token.
+ String writeToken = FSUtils.makeWriteToken(0, 0, 0);
Review comment:
any constants to reuse?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -126,8 +129,13 @@ public boolean commit(String instantTime,
JavaRDD<WriteStatus> writeStatuses, Op
@Override
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> createTable(HoodieWriteConfig config,
-
Configuration hadoopConf) {
- return HoodieSparkTable.create(config, context);
+
Configuration hadoopConf,
+
boolean refreshTimeline) {
Review comment:
format
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +396,86 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
/**
- * Sync the Metadata Table from the instants created on the dataset.
+ * Initialize buckets for a partition. For file listing, we just have one
bucket. But for record level index, we might have N number of buckets
+ * per partition. Technically speaking buckets here map to FileGroups in
Hudi.
*
- * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+ * Each bucket maps to FileGroups in Hudi and is represented with the
following format:
+ * bucket-ABCD
+ * where ABCD are digits. This allows up to 9999 buckets.
+ *
+ * Example:
+ * bucket-0001
+ * bucket-0002
*/
- private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it
is not enabled");
- // (re) init the metadata for reading.
- initTableMetadata();
- try {
- List<HoodieInstant> instantsToSync =
metadata.findInstantsToSyncForWriter();
- if (instantsToSync.isEmpty()) {
- return;
- }
-
- LOG.info("Syncing " + instantsToSync.size() + " instants to metadata
table: " + instantsToSync);
-
- // Read each instant in order and sync it to metadata table
- for (HoodieInstant instant : instantsToSync) {
- LOG.info("Syncing instant " + instant + " to metadata table");
-
- Option<List<HoodieRecord>> records =
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient,
- metaClient.getActiveTimeline(), instant, metadata.getUpdateTime());
- if (records.isPresent()) {
- commit(records.get(), MetadataPartitionType.FILES.partitionPath(),
instant.getTimestamp());
- }
+ private void initializeBuckets(HoodieTableMetaClient datasetMetaClient,
String partition, String instantTime,
Review comment:
partitionPath
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -81,13 +84,16 @@
public abstract class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWriter {
private static final Logger LOG =
LogManager.getLogger(HoodieBackedTableMetadataWriter.class);
+ private static final Integer MAX_BUCKET_COUNT = 9999;
Review comment:
lets make this 99999? do we even need this? this is just to format the
bucket right? can't we dynamically figure this out. i.e if `numBuckets=9999`
,the you know there needs to be four digits and bucket string should have four
characters right.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +396,86 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
/**
- * Sync the Metadata Table from the instants created on the dataset.
+ * Initialize buckets for a partition. For file listing, we just have one
bucket. But for record level index, we might have N number of buckets
Review comment:
we should avoid leaking buckets everywhere. unless we cannot explain
code with jsut file groups, we should not be using buckets IMO.
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java
##########
@@ -49,7 +49,9 @@ public void run(HoodieTableMetaClient metaClient,
HoodieTableVersion toVersion,
return new ZeroToOneUpgradeHandler().upgrade(config, context,
instantTime);
} else if (fromVersion == HoodieTableVersion.ONE && toVersion ==
HoodieTableVersion.TWO) {
return new OneToTwoUpgradeHandler().upgrade(config, context,
instantTime);
- } else {
+ } else if (fromVersion == HoodieTableVersion.TWO && toVersion ==
HoodieTableVersion.THREE) {
Review comment:
lets add a jira to track what all needs to be finally done for an
upgrade-downgrade story here
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
##########
@@ -31,13 +30,16 @@
*/
public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable
{
+ // Update the metadata table due to a COMMIT operation
Review comment:
javadoc
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -298,28 +306,38 @@ protected void completeCompaction(HoodieCommitMetadata
metadata, JavaRDD<WriteSt
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction");
List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
- finalizeWrite(table, compactionCommitTime, writeStats);
- LOG.info("Committing Compaction " + compactionCommitTime + ". Finished
with result " + metadata);
- SparkCompactHelpers.newInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
- WriteMarkersFactory.get(config.getMarkersType(), table,
compactionCommitTime)
- .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-
- if (compactionTimer != null) {
- long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
- try {
-
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
- durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
- } catch (ParseException e) {
- throw new HoodieCommitException("Commit time is not of valid format.
Failed to commit compaction "
- + config.getBasePath() + " at time " + compactionCommitTime, e);
+ try {
+ // TODO: check if we need
HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient,
config) here.
+ this.txnManager.beginTransaction(Option.of(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
compactionCommitTime)), Option.empty());
Review comment:
lets shrink it to just the oart needed within a lock
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -339,6 +357,7 @@ protected void completeCompaction(HoodieCommitMetadata
metadata, JavaRDD<WriteSt
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String
clusteringInstant, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
+ table.getHoodieView().sync();
Review comment:
more instances to cleanup?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -273,6 +275,7 @@ public void bootstrap(Option<Map<String, String>>
extraMetadata) {
public void rollbackFailedBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
+ table.getHoodieView().sync();
Review comment:
can this call getTableAnd.... ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]