This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1ceb03a47a4 [HUDI-6665] Fixing metadata writer close (#9385)
1ceb03a47a4 is described below
commit 1ceb03a47a49dce623703e0b17ee235603ca5f99
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Aug 8 01:23:17 2023 -0400
[HUDI-6665] Fixing metadata writer close (#9385)
Closing metadata writer for all invocations
---
.../hudi/client/BaseHoodieTableServiceClient.java | 13 ++++-
.../apache/hudi/client/BaseHoodieWriteClient.java | 31 ++++++++----
.../hudi/table/action/BaseActionExecutor.java | 55 ++++++++++++++++++++--
.../table/action/index/RunIndexActionExecutor.java | 54 +++++++++++----------
.../functional/TestExternalPathHandling.java | 40 ++++++++--------
5 files changed, 137 insertions(+), 56 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 141cc9b5be6..e55fb045e1e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -699,7 +699,18 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
protected void writeTableMetadata(HoodieTable table, String instantTime,
String actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus>
writeStatuses) {
checkArgument(table.isTableServiceAction(actionType, instantTime),
String.format("Unsupported action: %s.%s is not table service.", actionType,
instantTime));
context.setJobStatus(this.getClass().getSimpleName(), "Committing to
metadata table: " + config.getTableName());
- table.getMetadataWriter(instantTime).ifPresent(w ->
((HoodieTableMetadataWriter) w).update(metadata, writeStatuses, instantTime));
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(instantTime);
+ if (metadataWriterOpt.isPresent()) {
+ try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get())
{
+ metadataWriter.update(metadata, writeStatuses, instantTime);
+ } catch (Exception e) {
+ if (e instanceof HoodieException) {
+ throw (HoodieException) e;
+ } else {
+ throw new HoodieException("Failed to update metadata", e);
+ }
+ }
+ }
}
/**
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 220393b4fb2..6b03c5234f0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -60,7 +60,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
@@ -361,7 +360,18 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
tableServiceClient.writeTableMetadata(table, instantTime, actionType,
metadata, writeStatuses);
} else {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to
metadata table: " + config.getTableName());
- table.getMetadataWriter(instantTime).ifPresent(w ->
((HoodieTableMetadataWriter) w).update(metadata, writeStatuses, instantTime));
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(instantTime);
+ if (metadataWriterOpt.isPresent()) {
+ try (HoodieTableMetadataWriter metadataWriter =
metadataWriterOpt.get()) {
+ metadataWriter.update(metadata, writeStatuses, instantTime);
+ } catch (Exception e) {
+ if (e instanceof HoodieException) {
+ throw (HoodieException) e;
+ } else {
+ throw new HoodieException("Failed to update metadata", e);
+ }
+ }
+ }
}
}
@@ -1016,13 +1026,18 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty());
try {
context.setJobStatus(this.getClass().getSimpleName(), "Dropping
partitions from metadata table: " + config.getTableName());
- table.getMetadataWriter(dropInstant).ifPresent(w -> {
- try {
- ((HoodieTableMetadataWriter)
w).dropMetadataPartitions(partitionTypes);
- } catch (IOException e) {
- throw new HoodieIndexException("Failed to drop metadata index. ", e);
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(dropInstant);
+ if (metadataWriterOpt.isPresent()) {
+ try (HoodieTableMetadataWriter metadataWriter =
metadataWriterOpt.get()) {
+ metadataWriter.dropMetadataPartitions(partitionTypes);
+ } catch (Exception e) {
+ if (e instanceof HoodieException) {
+ throw (HoodieException) e;
+ } else {
+ throw new HoodieException("Failed to drop partitions from
metadata", e);
+ }
}
- });
+ }
} finally {
this.txnManager.endTransaction(Option.of(ownerInstant));
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index 1800031f1a8..31966203660 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -28,7 +28,10 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.HoodieTable;
public abstract class BaseActionExecutor<T, I, K, O, R> implements
Serializable {
@@ -58,7 +61,18 @@ public abstract class BaseActionExecutor<T, I, K, O, R>
implements Serializable
* @param metadata commit metadata of interest.
*/
protected final void writeTableMetadata(HoodieCommitMetadata metadata,
HoodieData<WriteStatus> writeStatus, String actionType) {
- table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata,
writeStatus, instantTime));
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(instantTime);
+ if (metadataWriterOpt.isPresent()) {
+ try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get())
{
+ metadataWriter.update(metadata, writeStatus, instantTime);
+ } catch (Exception e) {
+ if (e instanceof HoodieException) {
+ throw (HoodieException) e;
+ } else {
+ throw new HoodieException("Failed to update metadata", e);
+ }
+ }
+ }
}
/**
@@ -66,7 +80,18 @@ public abstract class BaseActionExecutor<T, I, K, O, R>
implements Serializable
* @param metadata clean metadata of interest.
*/
protected final void writeTableMetadata(HoodieCleanMetadata metadata, String
instantTime) {
- table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata,
instantTime));
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(instantTime);
+ if (metadataWriterOpt.isPresent()) {
+ try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get())
{
+ metadataWriter.update(metadata, instantTime);
+ } catch (Exception e) {
+ if (e instanceof HoodieException) {
+ throw (HoodieException) e;
+ } else {
+ throw new HoodieException("Failed to apply clean commit to
metadata", e);
+ }
+ }
+ }
}
/**
@@ -74,7 +99,18 @@ public abstract class BaseActionExecutor<T, I, K, O, R>
implements Serializable
* @param metadata rollback metadata of interest.
*/
protected final void writeTableMetadata(HoodieRollbackMetadata metadata) {
- table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata,
instantTime));
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(instantTime);
+ if (metadataWriterOpt.isPresent()) {
+ try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get())
{
+ metadataWriter.update(metadata, instantTime);
+ } catch (Exception e) {
+ if (e instanceof HoodieException) {
+ throw (HoodieException) e;
+ } else {
+ throw new HoodieException("Failed to apply rollbacks in metadata",
e);
+ }
+ }
+ }
}
/**
@@ -82,6 +118,17 @@ public abstract class BaseActionExecutor<T, I, K, O, R>
implements Serializable
* @param metadata restore metadata of interest.
*/
protected final void writeTableMetadata(HoodieRestoreMetadata metadata) {
- table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata,
instantTime));
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(instantTime);
+ if (metadataWriterOpt.isPresent()) {
+ try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get())
{
+ metadataWriter.update(metadata, instantTime);
+ } catch (Exception e) {
+ if (e instanceof HoodieException) {
+ throw (HoodieException) e;
+ } else {
+ throw new HoodieException("Failed to apply restore to metadata", e);
+ }
+ }
+ }
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 93e8a8f859f..9b91167899c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieTable;
@@ -49,6 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
@@ -135,34 +137,38 @@ public class RunIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I,
List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = null;
if (!firstTimeInitializingMetadataTable) {
// start indexing for each partition
- HoodieTableMetadataWriter metadataWriter =
table.getIndexingMetadataWriter(instantTime)
+ try (HoodieTableMetadataWriter metadataWriter =
table.getIndexingMetadataWriter(instantTime)
.orElseThrow(() -> new HoodieIndexException(String.format(
- "Could not get metadata writer to run index action for
instant: %s", instantTime)));
- // this will only build index upto base instant as generated by the
plan, we will be doing catchup later
- String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
- LOG.info("Starting Index Building with base instant: " +
indexUptoInstant);
- metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);
+ "Could not get metadata writer to run index action for
instant: %s", instantTime)))) {
+ // this will only build index upto base instant as generated by the
plan, we will be doing catchup later
+ String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ LOG.info("Starting Index Building with base instant: " +
indexUptoInstant);
+ metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);
- // get remaining instants to catchup
- List<HoodieInstant> instantsToCatchup =
getInstantsToCatchup(indexUptoInstant);
- LOG.info("Total remaining instants to index: " +
instantsToCatchup.size());
+ // get remaining instants to catchup
+ List<HoodieInstant> instantsToCatchup =
getInstantsToCatchup(indexUptoInstant);
+ LOG.info("Total remaining instants to index: " +
instantsToCatchup.size());
- // reconcile with metadata table timeline
- String metadataBasePath =
getMetadataTableBasePath(table.getMetaClient().getBasePath());
- HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
- Set<String> metadataCompletedTimestamps =
getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant,
metadataMetaClient).stream()
- .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+ // reconcile with metadata table timeline
+ String metadataBasePath =
getMetadataTableBasePath(table.getMetaClient().getBasePath());
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+ Set<String> metadataCompletedTimestamps =
getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant,
metadataMetaClient).stream()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
- // index catchup for all remaining instants with a timeout
- currentCaughtupInstant = indexUptoInstant;
- catchupWithInflightWriters(metadataWriter, instantsToCatchup,
metadataMetaClient, metadataCompletedTimestamps);
- // save index commit metadata and update table config
- finalIndexPartitionInfos = indexPartitionInfos.stream()
- .map(info -> new HoodieIndexPartitionInfo(
- info.getVersion(),
- info.getMetadataPartitionPath(),
- currentCaughtupInstant))
- .collect(Collectors.toList());
+ // index catchup for all remaining instants with a timeout
+ currentCaughtupInstant = indexUptoInstant;
+ catchupWithInflightWriters(metadataWriter, instantsToCatchup,
metadataMetaClient, metadataCompletedTimestamps);
+ // save index commit metadata and update table config
+ finalIndexPartitionInfos = indexPartitionInfos.stream()
+ .map(info -> new HoodieIndexPartitionInfo(
+ info.getVersion(),
+ info.getMetadataPartitionPath(),
+ currentCaughtupInstant))
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new HoodieMetadataException("Failed to index partition " +
Arrays.toString(indexPartitionInfos.stream()
+ .map(entry ->
entry.getMetadataPartitionPath()).collect(Collectors.toList()).toArray()));
+ }
} else {
String indexUptoInstant = fileIndexPartitionInfo.getIndexUptoInstant();
// save index commit metadata and update table config
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
index 9f28290f1ad..0785f9eea76 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
@@ -156,27 +156,29 @@ public class TestExternalPathHandling extends
HoodieClientTestBase {
cleanTime,
Option.empty(),
cleanStats);
- HoodieTableMetadataWriter hoodieTableMetadataWriter =
(HoodieTableMetadataWriter) writeClient.initTable(WriteOperationType.UPSERT,
Option.of(cleanTime)).getMetadataWriter(cleanTime).get();
- hoodieTableMetadataWriter.update(cleanMetadata, cleanTime);
-
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightClean,
- TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
- // make sure we still get the same results as before
- assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2,
fileId2, partitionPath2.isEmpty() ? 2 : 1);
- assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3,
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+ try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
(HoodieTableMetadataWriter) writeClient.initTable(WriteOperationType.UPSERT,
Option.of(cleanTime)).getMetadataWriter(cleanTime).get()) {
+ hoodieTableMetadataWriter.update(cleanMetadata, cleanTime);
+
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightClean,
+ TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
+ // make sure we still get the same results as before
+ assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2,
fileId2, partitionPath2.isEmpty() ? 2 : 1);
+ assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3,
fileId3, partitionPath2.isEmpty() ? 2 : 1);
- // trigger archiver manually
- writeClient.archive();
- // assert commit was archived
- Assertions.assertEquals(1,
metaClient.getArchivedTimeline().reload().filterCompletedInstants().countInstants());
- // make sure we still get the same results as before
- assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2,
fileId2, partitionPath2.isEmpty() ? 2 : 1);
- assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3,
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+ // trigger archiver manually
+ writeClient.archive();
+ // assert commit was archived
+ Assertions.assertEquals(1,
metaClient.getArchivedTimeline().reload().filterCompletedInstants().countInstants());
+ // make sure we still get the same results as before
+ assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2,
fileId2, partitionPath2.isEmpty() ? 2 : 1);
+ assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3,
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+
+ // assert that column stats are correct
+ HoodieBackedTableMetadata hoodieBackedTableMetadata = new
HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(),
writeConfig.getBasePath(), true);
+ assertEmptyColStats(hoodieBackedTableMetadata, partitionPath1,
fileName1);
+ assertColStats(hoodieBackedTableMetadata, partitionPath1, fileName2);
+ assertColStats(hoodieBackedTableMetadata, partitionPath2, fileName3);
+ }
- // assert that column stats are correct
- HoodieBackedTableMetadata hoodieBackedTableMetadata = new
HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(),
writeConfig.getBasePath(), true);
- assertEmptyColStats(hoodieBackedTableMetadata, partitionPath1, fileName1);
- assertColStats(hoodieBackedTableMetadata, partitionPath1, fileName2);
- assertColStats(hoodieBackedTableMetadata, partitionPath2, fileName3);
}
static Stream<Arguments> getArgs() {