the-other-tim-brown commented on code in PR #13286:
URL: https://github.com/apache/hudi/pull/13286#discussion_r2134835649
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java:
##########
@@ -36,6 +39,37 @@
*/
public interface HoodieTableMetadataWriter<I,O> extends Serializable,
AutoCloseable {
+ /**
+ * Starts a new commit in metadata table for optimized write flow.
+ * @param instantTime
+ */
+ void startCommit(String instantTime);
+
+ /**
+ * Prepare records and write to Metadata table for all eligible partitions
except FILES partition. This will be used in optimized writes,
+ * where in data table writes statuses are maintained as HoodieData and
based on that, we prepare records and write to Metadata table
+ * partitions (except FILES). Caution should be followed to ensure the
action is not triggered on the incoming HoodieData < WriteStatus >
+ * and for the writes to metadata table. Caller is expected to trigger
collect just once for both set of HoodieData < WriteStatus >.
+ * @param writeStatus {@link HoodieData} of {@link WriteStatus} from data
table writes.
+ * @param instantTime instant time of interest.
+ * @return {@link HoodieData} of {@link WriteStatus} for writes to metadata
table.
+ */
+ HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime);
Review Comment:
Streaming is one of those terms that an mean different things in different
contexts, is there another term we could use to describe this write pattern?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -161,6 +185,13 @@ protected void upsertAndCommit(BaseHoodieWriteClient<?,
JavaRDD<HoodieRecord>, ?
writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(),
DELTA_COMMIT_ACTION, Collections.emptyMap());
}
+ @Override
+ protected void upsertAndCommit(BaseHoodieWriteClient<?,
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String
instantTime, JavaRDD<HoodieRecord> preppedRecordInputs,
+ List<HoodieFileGroupId>
mdtFileGroupsIdsToUpdate) {
+ JavaRDD<WriteStatus> writeStatusJavaRDD =
((SparkRDDMetadataWriteClient)writeClient).upsertPreppedRecords(preppedRecordInputs,
instantTime, Option.of(mdtFileGroupsIdsToUpdate));
Review Comment:
it may be cleaner to move this casting to a helper method
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java:
##########
@@ -36,6 +39,37 @@
*/
public interface HoodieTableMetadataWriter<I,O> extends Serializable,
AutoCloseable {
+ /**
+ * Starts a new commit in metadata table for optimized write flow.
Review Comment:
Right now we have a special flow in the BaseHoodieWriteClient for starting
commits for the metadata table. Would it be possible to have that flow through
this method instead?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -200,12 +218,17 @@ protected
HoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf,
}
}
ValidationUtils.checkArgument(!initialized || this.metadata != null, "MDT
Reader should have been opened post initialization");
+
+ this.metadataIndexGenerator = streamingWrites ?
Option.of(getMetadataIndexGenerator()) : Option.empty();
+ this.streamingWritesEnabled = streamingWrites;
}
List<MetadataPartitionType> getEnabledPartitions(HoodieMetadataConfig
metadataConfig, HoodieTableMetaClient metaClient) {
return MetadataPartitionType.getEnabledPartitions(metadataConfig,
metaClient);
}
+ abstract MetadataIndexGenerator getMetadataIndexGenerator();
Review Comment:
nitpick: the naming of this overlaps with the getter for the instance
variable. Can this be named something like `initializeMetadataIndexGenerator`
instead?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+
+/**
+ * For now this is a placeholder to generate all MDT records in one place.
+ * Once https://github.com/apache/hudi/pull/13226 is landed, we will leverage
the new abstraction to generate MDT records.
+ */
+public class MetadataIndexGenerator implements Serializable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MetadataIndexGenerator.class);
+
+ /**
+ * MDT record generation utility. This function is expected to be invoked
from a map Partition call, where one spark task will receive
+ * one WriteStatus as input and the output contains prepared Metadata table
records for all eligible partitions that can operate on one
+ * WriteStatus instance only.
+ */
+ static class WriteStatusBasedMetadataIndexGenerator implements
SerializableFunction<WriteStatus, Iterator<HoodieRecord>> {
+ List<MetadataPartitionType> enabledPartitionTypes;
+ HoodieWriteConfig dataWriteConfig;
+ StorageConfiguration<?> storageConf;
+ String instantTime;
Review Comment:
make this `private final`?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
+import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Unit tests {@link HoodieCreateHandle}.
Review Comment:
Is this the right class referenced?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -148,6 +158,20 @@ protected JavaRDD<HoodieRecord>
convertHoodieDataToEngineSpecificData(HoodieData
return HoodieJavaRDD.getJavaRDD(records);
}
+ @Override
+ protected HoodieData<WriteStatus>
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+ return HoodieJavaRDD.of(records);
+ }
+
+ @Override
+ public JavaRDD<WriteStatus>
streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>,
HoodieData<HoodieRecord>> hoodieFileGroupsToUpdateAndTaggedMdtRecords, String
instantTime) {
+ JavaRDD<HoodieRecord> mdtRecords =
HoodieJavaRDD.getJavaRDD(hoodieFileGroupsToUpdateAndTaggedMdtRecords.getValue());
+ engineContext.setJobStatus(this.getClass().getSimpleName(),
String.format("Upserting at %s into metadata table %s", instantTime,
metadataWriteConfig.getTableName()));
+ JavaRDD<WriteStatus> metadataWriteStatusesSoFar =
((SparkRDDMetadataWriteClient)getWriteClient()).upsertPreppedRecords(mdtRecords,
instantTime, Option.of(
Review Comment:
nitpick: formatting - put a space after `(SparkRDDMetadataWriteClient)`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1096,6 +1119,126 @@ public void buildMetadataPartitions(HoodieEngineContext
engineContext, List<Hood
initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
}
+ public void startCommit(String instantTime) {
+ ValidationUtils.checkState(streamingWritesEnabled, "Streaming writes
should be enabled for startCommit API");
+
+ if
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
{
+ // if this is a new commit being applied to metadata for the first time
+ LOG.info("New commit at {} being applied to MDT.", instantTime);
+ } else {
+ throw new HoodieMetadataException("Starting the same commit in Metadata
table more than once w/o rolling back : " + instantTime);
+ }
+
+ // this is where we might instantiate the write client to metadata table
for the first time.
+ getWriteClient().startCommitForMetadataTable(metadataMetaClient,
instantTime, HoodieTimeline.DELTA_COMMIT_ACTION);
+ }
+
+ @Override
+ public HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime) {
+ List<MetadataPartitionType> mdtPartitionsToTag = new
ArrayList<>(enabledPartitionTypes);
+ mdtPartitionsToTag.remove(FILES);
+ mdtPartitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
+ if (mdtPartitionsToTag.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ HoodieData<HoodieRecord> untaggedMdtRecords = writeStatus.flatMap(
+ new
MetadataIndexGenerator.WriteStatusBasedMetadataIndexGenerator(mdtPartitionsToTag,
dataWriteConfig, storageConf, instantTime));
+
+ // tag records w/ location
+ Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>>
hoodieFileGroupsToUpdateAndTaggedMdtRecords =
tagRecordsWithLocationWithStreamingWrites(untaggedMdtRecords,
+ mdtPartitionsToTag.stream().map(mdtPartition ->
mdtPartition.getPartitionPath()).collect(
+ Collectors.toSet()));
+
+ // write partial writes to mdt table (for those partitions where streaming
writes are enabled)
+ HoodieData<WriteStatus> mdtWriteStatusHoodieData =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords,
instantTime));
+ // dag not yet de-referenced. do not invoke any action on
mdtWriteStatusHoodieData yet.
+ return mdtWriteStatusHoodieData;
+ }
+
+ /**
+ * Upsert the tagged records to metadata table in the streaming flow.
+ * @param hoodieFileGroupsToUpdateAndTaggedMdtRecords Pair of {@link List}
of {@link HoodieFileGroupId} referring to list of all file groups ids that
could receive updates
+ * and {@link HoodieData}
of tagged {@link HoodieRecord}s.
+ * @param instantTime instant time of interest.
+ * @return
+ */
+ protected O streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>,
HoodieData<HoodieRecord>> hoodieFileGroupsToUpdateAndTaggedMdtRecords, String
instantTime) {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
+ @Override
+ public void completeStreamingCommit(String instantTime, HoodieEngineContext
context, List<HoodieWriteStat> metadataWriteStatsSoFar, HoodieCommitMetadata
metadata) {
+ List<HoodieWriteStat> allWriteStats = new
ArrayList<>(metadataWriteStatsSoFar);
+ // update metadata for left over partitions which does not have streaming
writes support.
+ allWriteStats.addAll(prepareAndWriteToNonStreamingPartitions(context,
metadata, instantTime).map(writeStatus ->
writeStatus.getStat()).collectAsList());
+ getWriteClient().commitStats(instantTime, allWriteStats, Option.empty(),
HoodieTimeline.DELTA_COMMIT_ACTION,
+ Collections.emptyMap(), Option.empty());
+ }
+
+ private HoodieData<WriteStatus>
prepareAndWriteToNonStreamingPartitions(HoodieEngineContext context,
HoodieCommitMetadata commitMetadata, String instantTime) {
+ Set<String> mdtPartitionsToUpdate =
getNonStreamingMetadataPartitionsToUpdate();
+ Map<String, HoodieData<HoodieRecord>> mdtPartitionsAndUnTaggedRecords =
new MdtPartitionRecordGeneratorBatchMode(instantTime, commitMetadata,
mdtPartitionsToUpdate)
+ .convertMetadata();
+
+ HoodieData<HoodieRecord> untaggedMdtRecords = context.emptyHoodieData();
+ for (Map.Entry<String, HoodieData<HoodieRecord>> entry:
mdtPartitionsAndUnTaggedRecords.entrySet()) {
+ untaggedMdtRecords = untaggedMdtRecords.union(entry.getValue());
+ }
+
+ // write to mdt table for non streaming mdt partitions
+ Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> taggedRecords =
tagRecordsWithLocationWithStreamingWrites(untaggedMdtRecords,
mdtPartitionsToUpdate);
+ HoodieData<WriteStatus> mdtWriteStatusHoodieData =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(taggedRecords,
instantTime));
+ return mdtWriteStatusHoodieData;
+ }
+
+ private Set<String> getNonStreamingMetadataPartitionsToUpdate() {
+ Set<String> toReturn =
enabledPartitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
+ STREAMING_WRITES_SUPPORTED_PARTITIONS.forEach(metadataPartitionType ->
toReturn.remove(metadataPartitionType.getPartitionPath()));
+ return toReturn;
+ }
+
+ /**
+ * Returns List of pair of partition name and MDT fileId updated in the
partition along with the tagged MDT records.
+ * @param untaggedMdtRecords Untagged MDT records
+ */
+ protected Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>>
tagRecordsWithLocationWithStreamingWrites(HoodieData<HoodieRecord>
untaggedMdtRecords,
+
Set<String> enabledMetadataPartitions) {
+ List<HoodieFileGroupId> updatedMDTFileGroupIds = new ArrayList<>();
+ // Fetch latest file slices for all enabled MDT partitions
+ Map<String, List<FileSlice>> mdtPartitionLatestFileSlices = new
HashMap<>();
+ try (HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+ enabledMetadataPartitions.forEach(partitionName -> {
+ List<FileSlice> fileSlices =
+
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.ofNullable(fsView), partitionName);
+ if (fileSlices.isEmpty()) {
+ // scheduling of INDEX only initializes the file group and not add
commit
Review Comment:
is this case generally applicable or should the code check the metadata
partition type? Would it be possible the file groups were from failed commits?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestCreateHandle.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+import java.util.UUID;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests {@link HoodieCreateHandle}.
+ */
+@SuppressWarnings("checkstyle:LineLength")
Review Comment:
why is this needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]