nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r818036625
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -1301,4 +1369,33 @@ public void close() {
this.heartbeatClient.stop();
this.txnManager.close();
}
+
+ private void setWriteTimer(HoodieTable<T, I, K, O> table) {
+ String commitType = table.getMetaClient().getCommitActionType();
+ if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) {
+ writeTimer = metrics.getCommitCtx();
+ } else if (commitType.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
+ writeTimer = metrics.getDeltaCommitCtx();
+ }
+ }
+
+ private void tryUpgrade(HoodieTableMetaClient metaClient, Option<String>
instantTime) {
+ UpgradeDowngrade upgradeDowngrade =
+ new UpgradeDowngrade(metaClient, config, context,
upgradeDowngradeHelper);
+
+ if
(upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
Review comment:
as per master, this code is available only in SparkEngine. Flink and
java does not have this. Even if we wish to unify, I would do it in a separate
patch and getting it reviewed by experts who have worked on it. Can you move
this to SparkRDDWriteClient for now.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -1246,17 +1243,88 @@ public HoodieMetrics getMetrics() {
}
/**
- * Get HoodieTable and init {@link Timer.Context}.
+ * Instantiates engine-specific instance of {@link HoodieTable} as well as
performs necessary
+ * bootstrapping operations (for ex, validating whether Metadata Table has
to be bootstrapped)
*
- * @param operationType write operation type
+ * NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY
OPERATIONS
+ * NOT REQUIRING EXTERNAL SYNCHRONIZATION
+ *
+ * @param metaClient instance of {@link HoodieTableMetaClient}
* @param instantTime current inflight instant time
- * @return HoodieTable
+ * @return instantiated {@link HoodieTable}
*/
- protected abstract HoodieTable<T, I, K, O>
getTableAndInitCtx(WriteOperationType operationType, String instantTime);
+ protected abstract HoodieTable<T, I, K, O> doInitTable(HoodieTableMetaClient
metaClient, Option<String> instantTime);
/**
- * Sets write schema from last instant since deletes may not have schema set
in the config.
+ * Instantiates and initializes instance of {@link HoodieTable}, performing
crucial bootstrapping
+ * operations such as:
+ *
+ * NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please
check on
+ * {@link #doInitTable(HoodieTableMetaClient, Option<String>)} instead
+ *
+ * <ul>
+ * <li>Checking whether upgrade/downgrade is required</li>
+ * <li>Bootstrapping Metadata Table (if required)</li>
+ * <li>Initializing metrics contexts</li>
+ * </ul>
*/
+ protected final HoodieTable<T, I, K, O> initTable(WriteOperationType
operationType, Option<String> instantTime) {
+ HoodieTableMetaClient metaClient = createMetaClient(true);
+ // Setup write schemas for deletes
+ if (operationType == WriteOperationType.DELETE) {
+ setWriteSchemaForDeletes(metaClient);
+ }
+
+ HoodieTable<T, I, K, O> table;
+
+ this.txnManager.beginTransaction();
+ try {
+ tryUpgrade(metaClient, instantTime);
+ table = doInitTable(metaClient, instantTime);
+ } finally {
+ this.txnManager.endTransaction();
+ }
+
+ // Validate table properties
+ metaClient.validateTableProperties(config.getProps(), operationType);
+ // Make sure that FS View is in sync
+ table.getHoodieView().sync();
+
+ switch (operationType) {
+ case INSERT:
+ case INSERT_PREPPED:
+ case UPSERT:
+ case UPSERT_PREPPED:
+ case BULK_INSERT:
+ case BULK_INSERT_PREPPED:
+ case INSERT_OVERWRITE:
+ case INSERT_OVERWRITE_TABLE:
+ setWriteTimer(table);
+ break;
+ case CLUSTER:
+ clusteringTimer = metrics.getClusteringCtx();
+ break;
+ case COMPACT:
+ compactionTimer = metrics.getCompactionCtx();
+ break;
+ default:
+ }
+
+ return table;
+ }
+
+ protected <R> R withLock(Supplier<R> s) {
Review comment:
is this used anywhere ?
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieJavaTable;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Java upgrade and downgrade helper
+ */
+public class JavaUpgradeDowngradeHelper implements SupportsUpgradeDowngrade {
Review comment:
if I am not wrong, java did not have any upgrade/downgrade step prior to
this patch and now we are adding it is it ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -392,28 +405,53 @@ private HoodieMetadataColumnStats
combineColumnStatsMetadata(HoodieMetadataPaylo
return filesystemMetadata.entrySet().stream().filter(e ->
e.getValue().getIsDeleted() == isDeleted);
}
- private Map<String, HoodieMetadataFileInfo>
combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+ private Map<String, HoodieMetadataFileInfo>
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
+
+ // First, add all files listed in the previous record
if (previousRecord.filesystemMetadata != null) {
combinedFileInfo.putAll(previousRecord.filesystemMetadata);
}
+ // Second, merge in the files listed in the new record
if (filesystemMetadata != null) {
- filesystemMetadata.forEach((filename, fileInfo) -> {
- // If the filename wasnt present then we carry it forward
- if (!combinedFileInfo.containsKey(filename)) {
- combinedFileInfo.put(filename, fileInfo);
- } else {
- if (fileInfo.getIsDeleted()) {
- // file deletion
- combinedFileInfo.remove(filename);
- } else {
- // file appends.
- combinedFileInfo.merge(filename, fileInfo, (oldFileInfo,
newFileInfo) -> {
- return new HoodieMetadataFileInfo(oldFileInfo.getSize() +
newFileInfo.getSize(), false);
- });
- }
- }
+ validatePayload(type, filesystemMetadata);
+
+ filesystemMetadata.forEach((key, fileInfo) -> {
+ combinedFileInfo.merge(key, fileInfo,
+ // Combine previous record w/ the new one, new records taking
precedence over
+ // the old one
+ //
+ // NOTE: That if previous listing contains the file that is being
deleted by the tombstone
+ // record (`IsDeleted` = true) in the new one, we simply
delete the file from the resulting
+ // listing as well as drop the tombstone itself.
+ // However, if file is not present in the previous record we
have to persist tombstone
+ // record in the listing to make sure we carry forward
information that this file
+ // was deleted. This special case could occur since the
merging flow is 2-stage:
+ // - First we merge records from all of the delta
log-files
+ // - Then we merge records from base-files with the delta
ones (coming as a result
+ // of the previous step)
+ (oldFileInfo, newFileInfo) ->
+ // NOTE: We can’t assume that MT update records will be
ordered the same way as actual
+ // FS operations (since they are not atomic), therefore
MT record merging should be a
+ // _commutative_ & _associative_ operation (ie one that
would work even in case records
+ // will get re-ordered), which is
+ // - Possible for file-sizes (since file-sizes will
ever grow, we can simply
+ // take max of the old and new records)
+ // - Not possible for is-deleted flags*
+ //
+ // *However, we’re assuming that the case of concurrent
write and deletion of the same
+ // file is _impossible_ -- it would only be possible
with concurrent upsert and
+ // rollback operation (affecting the same log-file),
which is implausible, b/c either
+ // of the following have to be true:
+ // - We’re appending to failed log-file (then the
other writer is trying to
+ // rollback it concurrently, before it’s own write)
+ // - Rollback (of completed instant) is running
concurrently with append (meaning
+ // that restore is running concurrently with a write,
which is also nut supported
Review comment:
thanks for the comments. definitely will be useful even for us down the
line
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -291,15 +302,15 @@ public void bootstrap(Option<Map<String, String>>
extraMetadata) {
if
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
throw new HoodieException("Cannot bootstrap the table in multi-writer
mode");
}
- HoodieTable<T, I, K, O> table =
getTableAndInitCtx(WriteOperationType.UPSERT,
HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
+ HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UPSERT,
Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS));
rollbackFailedBootstrap();
table.bootstrap(context, extraMetadata);
}
/**
* Main API to rollback failed bootstrap.
*/
- public void rollbackFailedBootstrap() {
Review comment:
why moving it from public to protected? we can leave it as is. I don't
think anyone outside might be having their own write client and implement these
public methods. but curious on why changing it now ?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -397,11 +396,11 @@ public void completeCompaction(
}
@Override
- protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String
instantTime) {
- HoodieTableMetaClient metaClient = createMetaClient(true);
+ protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> doInitTable(HoodieTableMetaClient metaClient, Option<String>
instantTime) {
new UpgradeDowngrade(metaClient, config, context,
FlinkUpgradeDowngradeHelper.getInstance())
- .run(HoodieTableVersion.current(), instantTime);
- return getTableAndInitCtx(metaClient, operationType);
+ .run(HoodieTableVersion.current(), instantTime.orElse(null));
Review comment:
are we not calling upgrade downgrade twice?
once in TryUpgrade and once here within doInitTable ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]