yuzhaojing commented on code in PR #6732:
URL: https://github.com/apache/hudi/pull/6732#discussion_r1062133114
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -352,182 +303,23 @@ public void commitLogCompaction(String
logCompactionInstantTime, HoodieCommitMet
protected void completeLogCompaction(HoodieCommitMetadata metadata,
HoodieTable table,
String logCompactionCommitTime) {
- this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log
compaction write status and commit compaction");
- List<HoodieWriteStat> writeStats = metadata.getWriteStats();
final HoodieInstant logCompactionInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);
- try {
- this.txnManager.beginTransaction(Option.of(logCompactionInstant),
Option.empty());
- preCommit(logCompactionInstant, metadata);
- finalizeWrite(table, logCompactionCommitTime, writeStats);
- // commit to data table after committing to metadata table.
- updateTableMetadata(table, metadata, logCompactionInstant);
- LOG.info("Committing Log Compaction " + logCompactionCommitTime + ".
Finished with result " + metadata);
- CompactHelpers.getInstance().completeInflightLogCompaction(table,
logCompactionCommitTime, metadata);
- } finally {
- this.txnManager.endTransaction(Option.of(logCompactionInstant));
- }
- WriteMarkersFactory.get(config.getMarkersType(), table,
logCompactionCommitTime)
- .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- if (compactionTimer != null) {
- long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-
HoodieActiveTimeline.parseDateFromInstantTimeSafely(logCompactionCommitTime).ifPresent(parsedInstant
->
- metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs,
metadata, HoodieActiveTimeline.LOG_COMPACTION_ACTION)
- );
- }
- LOG.info("Log Compacted successfully on commit " +
logCompactionCommitTime);
+ preCommit(logCompactionInstant, metadata);
+ tableServiceClient.completeLogCompaction(metadata, table,
logCompactionCommitTime);
}
@Override
protected HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompact(String
logCompactionInstantTime, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
preWrite(logCompactionInstantTime, WriteOperationType.LOG_COMPACT,
table.getMetaClient());
- HoodieTimeline pendingLogCompactionTimeline =
table.getActiveTimeline().filterPendingLogCompactionTimeline();
- HoodieInstant inflightInstant =
HoodieTimeline.getLogCompactionInflightInstant(logCompactionInstantTime);
- if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
- LOG.info("Found Log compaction inflight file. Rolling back the commit
and exiting.");
- table.rollbackInflightLogCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
- table.getMetaClient().reloadActiveTimeline();
- throw new HoodieException("Inflight logcompaction file exists");
- }
- logCompactionTimer = metrics.getLogCompactionCtx();
- WriteMarkersFactory.get(config.getMarkersType(), table,
logCompactionInstantTime);
- HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata =
table.logCompact(context, logCompactionInstantTime);
- HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompactionMetadata =
writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
- if (shouldComplete &&
logCompactionMetadata.getCommitMetadata().isPresent()) {
- completeTableService(TableServiceType.LOG_COMPACT,
logCompactionMetadata.getCommitMetadata().get(), table,
logCompactionInstantTime);
- }
- return logCompactionMetadata;
+ return tableServiceClient.logCompact(logCompactionInstantTime,
shouldComplete);
}
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String
clusteringInstant, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
preWrite(clusteringInstant, WriteOperationType.CLUSTER,
table.getMetaClient());
- HoodieTimeline pendingClusteringTimeline =
table.getActiveTimeline().filterPendingReplaceTimeline();
- HoodieInstant inflightInstant =
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
- if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
- table.rollbackInflightClustering(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
- table.getMetaClient().reloadActiveTimeline();
- }
- clusteringTimer = metrics.getClusteringCtx();
- LOG.info("Starting clustering at " + clusteringInstant);
- HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata =
table.cluster(context, clusteringInstant);
- HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata =
writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
- // Validation has to be done after cloning. if not, it could result in
dereferencing the write status twice which means clustering could get executed
twice.
- validateClusteringCommit(clusteringMetadata, clusteringInstant, table);
- // TODO : Where is shouldComplete used ?
- if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
- completeTableService(TableServiceType.CLUSTER,
clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
- }
- return clusteringMetadata;
- }
-
- private void completeClustering(HoodieReplaceCommitMetadata metadata,
- HoodieTable table,
- String clusteringCommitTime) {
- List<HoodieWriteStat> writeStats =
metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
- e.getValue().stream()).collect(Collectors.toList());
-
- if
(writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0)
{
- throw new HoodieClusteringException("Clustering failed to write to
files:"
- + writeStats.stream().filter(s -> s.getTotalWriteErrors() >
0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
- }
-
- final HoodieInstant clusteringInstant =
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
- try {
- this.txnManager.beginTransaction(Option.of(clusteringInstant),
Option.empty());
-
- finalizeWrite(table, clusteringCommitTime, writeStats);
- // Update table's metadata (table)
- updateTableMetadata(table, metadata, clusteringInstant);
-
- LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished
with result " + metadata);
-
- table.getActiveTimeline().transitionReplaceInflightToComplete(
- clusteringInstant,
- Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
- } catch (Exception e) {
- throw new HoodieClusteringException("unable to transition clustering
inflight to complete: " + clusteringCommitTime, e);
- } finally {
- this.txnManager.endTransaction(Option.of(clusteringInstant));
- }
- WriteMarkersFactory.get(config.getMarkersType(), table,
clusteringCommitTime)
- .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- if (clusteringTimer != null) {
- long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
-
HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant
->
- metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs,
metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
- );
- }
- LOG.info("Clustering successfully on commit " + clusteringCommitTime);
- }
-
- private void
validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>>
clusteringMetadata, String clusteringCommitTime, HoodieTable table) {
- if (clusteringMetadata.getWriteStatuses().isEmpty()) {
- HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan(
- table.getMetaClient(),
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringCommitTime))
- .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
- "Unable to read clustering plan for instant: " +
clusteringCommitTime));
- throw new HoodieClusteringException("Clustering plan produced 0
WriteStatus for " + clusteringCommitTime
- + " #groups: " + clusteringPlan.getInputGroups().size() + " expected
at least "
- +
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
- + " write statuses");
- }
- }
-
- private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata
commitMetadata,
- HoodieInstant hoodieInstant) {
- boolean isTableServiceAction =
table.isTableServiceAction(hoodieInstant.getAction(),
hoodieInstant.getTimestamp());
- // Do not do any conflict resolution here as we do with regular writes. We
take the lock here to ensure all writes to metadata table happens within a
- // single lock (single writer). Because more than one write to metadata
table will result in conflicts since all of them updates the same partition.
- table.getMetadataWriter(hoodieInstant.getTimestamp())
- .ifPresent(writer -> ((HoodieTableMetadataWriter)
writer).update(commitMetadata, hoodieInstant.getTimestamp(),
isTableServiceAction));
- }
-
- @Override
- protected void initMetadataTable(Option<String> instantTime) {
- // Initialize Metadata Table to make sure it's bootstrapped _before_ the
operation,
- // if it didn't exist before
- // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
- initializeMetadataTable(instantTime);
- }
-
- /**
- * Initialize the metadata table if needed. Creating the metadata table
writer
- * will trigger the initial bootstrapping from the data table.
- *
- * @param inFlightInstantTimestamp - The in-flight action responsible for
the metadata table initialization
- */
- private void initializeMetadataTable(Option<String>
inFlightInstantTimestamp) {
- if (config.isMetadataTableEnabled()) {
- HoodieTableMetadataWriter writer =
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(),
config,
- context, Option.empty(), inFlightInstantTimestamp);
- try {
- writer.close();
- } catch (Exception e) {
- throw new HoodieException("Failed to instantiate Metadata table ", e);
- }
- }
- }
Review Comment:
This is a problem and I'm going to fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]