nsivabalan commented on code in PR #13295: URL: https://github.com/apache/hudi/pull/13295#discussion_r2113374578
########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.table.HoodieSparkMergeOnReadMetadataTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import org.apache.spark.api.java.JavaRDD; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; + +/** + * Write client to assist with writing to metadata table. + * @param <T> + */ +public class SparkRDDMetadataWriteClient<T> extends SparkRDDWriteClient<T> { + + // tracks the instants for which preWrite has been invoked. + private final Set<String> preWriteCompletedInstants = new HashSet<>(); + + public SparkRDDMetadataWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { + super(context, clientConfig); + } + + public SparkRDDMetadataWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, + Option<EmbeddedTimelineService> timelineService) { + super(context, writeConfig, timelineService); + } + + /** + * Upserts the given prepared records into the Hoodie table, at the supplied instantTime. + * <p> + * This implementation requires that the input records are already tagged, and de-duped if needed. + * + * @param preppedRecords Prepared HoodieRecords to upsert + * @param instantTime Instant time of the commit + * @return Collection of WriteStatus to inspect errors and counts + */ + public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, Option<List<HoodieFileGroupId>> partitionFileIdPairsOpt) { Review Comment: yes, you are right. eventually when we call writeClient.commit() for the metadata table, we will pass in both sets of HoodieWriteStats over which the marker reconciliation will kick in. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.table.HoodieSparkMergeOnReadMetadataTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import org.apache.spark.api.java.JavaRDD; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; + +/** + * Write client to assist with writing to metadata table. + * @param <T> + */ +public class SparkRDDMetadataWriteClient<T> extends SparkRDDWriteClient<T> { + + // tracks the instants for which preWrite has been invoked. + private final Set<String> preWriteCompletedInstants = new HashSet<>(); + + public SparkRDDMetadataWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { + super(context, clientConfig); + } + + public SparkRDDMetadataWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, + Option<EmbeddedTimelineService> timelineService) { + super(context, writeConfig, timelineService); + } + + /** + * Upserts the given prepared records into the Hoodie table, at the supplied instantTime. + * <p> + * This implementation requires that the input records are already tagged, and de-duped if needed. + * + * @param preppedRecords Prepared HoodieRecords to upsert + * @param instantTime Instant time of the commit + * @return Collection of WriteStatus to inspect errors and counts + */ + public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, Option<List<HoodieFileGroupId>> partitionFileIdPairsOpt) { + HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table = + initTable(WriteOperationType.UPSERT_PREPPED, Option.ofNullable(instantTime)); + table.validateUpsertSchema(); + boolean initialCall = !preWriteCompletedInstants.contains(instantTime); + if (initialCall) { + // we do not want to call prewrite more than once for the same instant, since we could be writing to metadata table more than once w/ streaming writes. + preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient()); + preWriteCompletedInstants.add(instantTime); + } + HoodieWriteMetadata<HoodieData<WriteStatus>> result = ((HoodieSparkMergeOnReadMetadataTable) table).upsertPrepped(context, instantTime, HoodieJavaRDD.of(preppedRecords), + partitionFileIdPairsOpt, initialCall); + HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses())); + return postWrite(resultRDD, instantTime, table); + } + + /** + * Complete changes performed at the given instantTime marker with specified action. + */ + @Override + public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, Review Comment: :) ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMetadataTableUpsertCommitActionExecutor.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.WorkloadStat; +import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor; + +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.storage.StorageLevel; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Upsert commit action executor for Metadata table. + * + * @param <T> + */ +public class SparkMetadataTableUpsertCommitActionExecutor<T> extends SparkUpsertPreppedDeltaCommitActionExecutor<T> { + + private static final WorkloadStat PLACEHOLDER_GLOBAL_STAT = new WorkloadStat(); + private final List<HoodieFileGroupId> mdtFileGroupIdList; + private final boolean initialCall; + + public SparkMetadataTableUpsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, + HoodieData<HoodieRecord<T>> preppedRecords, List<HoodieFileGroupId> mdtFileGroupIdList, + boolean initialCall) { + super(context, config, table, instantTime, preppedRecords); + this.mdtFileGroupIdList = mdtFileGroupIdList; + this.initialCall = initialCall; + } + + @Override + protected boolean shouldPersistInputRecords(JavaRDD<HoodieRecord<T>> inputRDD) { + return inputRDD.getStorageLevel() == StorageLevel.NONE(); + } + + @Override + protected WorkloadProfile prepareWorkloadProfile(HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate) { + // create workload profile only when we are writing to FILES partition in Metadata table. + WorkloadProfile workloadProfile = new WorkloadProfile(Pair.of(Collections.emptyMap(), PLACEHOLDER_GLOBAL_STAT)); + return workloadProfile; + } + + protected void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime) + throws HoodieCommitException { + // with streaming writes support, we might write to metadata table multiple times for the same instant times. + // ie. writeClient.startCommit(t1), writeClient.upsert(batch1, t1), writeClient.upsert(batch2, t1), writeClient.commit(t1, ...) + // So, here we are generating inflight file only in the last known writes, which we know will only have FILES partition. + if (initialCall) { Review Comment: synced up f2f. we are good here. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -1370,7 +1371,9 @@ public void close() throws Exception { protected void commitInternal(String instantTime, Map<String, HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing, Option<BulkInsertPartitioner> bulkInsertPartitioner) { ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is not fully initialized yet."); - HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap); + Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> result = prepRecords(partitionRecordsMap); + HoodieData<HoodieRecord> preppedRecords = result.getKey(); + List<HoodieFileGroupId> hoodieFileGroupIdList = result.getValue(); Review Comment: yes, not in this patch. in subsequent patches, we will be fixing upsertAndCommit() method below to take this in. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMetadataTableUpsertPartitioner.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.util.Option; + +import java.util.List; +import java.util.Map; + +import scala.Tuple2; + +/** + * Upsert Partitioner to be used for metadata table in spark. All records are prepped (location known) already wrt metadata table. So, we could optimize the upsert partitioner by avoiding certain + * unnecessary computations. + * @param <T> + */ +public class SparkMetadataTableUpsertPartitioner<T> extends SparkHoodiePartitioner<T> { + + private final List<BucketInfo> bucketInfoList; + private final int totalPartitions; + private final Map<String, Integer> fileIdToSparkPartitionIndexMap; + + public SparkMetadataTableUpsertPartitioner(List<BucketInfo> bucketInfoList, Map<String, Integer> fileIdToSparkPartitionIndexMap) { + super(null, null); // passing null since these are never used from {@link SparkHoodiePartitioner}. + this.bucketInfoList = bucketInfoList; + this.totalPartitions = bucketInfoList.size(); + this.fileIdToSparkPartitionIndexMap = fileIdToSparkPartitionIndexMap; + } + + @Override + public int numPartitions() { + return totalPartitions; Review Comment: would like to get the size once and re-use it. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.table.HoodieSparkMergeOnReadMetadataTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import org.apache.spark.api.java.JavaRDD; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; + +/** + * Write client to assist with writing to metadata table. + * @param <T> + */ +public class SparkRDDMetadataWriteClient<T> extends SparkRDDWriteClient<T> { + + // tracks the instants for which preWrite has been invoked. + private final Set<String> preWriteCompletedInstants = new HashSet<>(); + + public SparkRDDMetadataWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { + super(context, clientConfig); + } + + public SparkRDDMetadataWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, + Option<EmbeddedTimelineService> timelineService) { + super(context, writeConfig, timelineService); + } + Review Comment: yes, this will only be initialized in SparkHoodieBackedTableMetadataWriter. So, we can leave it as is. don't need to be over protective here. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.table.HoodieSparkMergeOnReadMetadataTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import org.apache.spark.api.java.JavaRDD; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; + +/** + * Write client to assist with writing to metadata table. + * @param <T> + */ +public class SparkRDDMetadataWriteClient<T> extends SparkRDDWriteClient<T> { + + // tracks the instants for which preWrite has been invoked. + private final Set<String> preWriteCompletedInstants = new HashSet<>(); + + public SparkRDDMetadataWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { + super(context, clientConfig); + } + + public SparkRDDMetadataWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, + Option<EmbeddedTimelineService> timelineService) { + super(context, writeConfig, timelineService); + } + + /** + * Upserts the given prepared records into the Hoodie table, at the supplied instantTime. + * <p> + * This implementation requires that the input records are already tagged, and de-duped if needed. + * + * @param preppedRecords Prepared HoodieRecords to upsert + * @param instantTime Instant time of the commit + * @return Collection of WriteStatus to inspect errors and counts + */ + public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, Option<List<HoodieFileGroupId>> partitionFileIdPairsOpt) { + HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table = + initTable(WriteOperationType.UPSERT_PREPPED, Option.ofNullable(instantTime)); + table.validateUpsertSchema(); + boolean initialCall = !preWriteCompletedInstants.contains(instantTime); + if (initialCall) { + // we do not want to call prewrite more than once for the same instant, since we could be writing to metadata table more than once w/ streaming writes. + preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient()); + preWriteCompletedInstants.add(instantTime); + } + HoodieWriteMetadata<HoodieData<WriteStatus>> result = ((HoodieSparkMergeOnReadMetadataTable) table).upsertPrepped(context, instantTime, HoodieJavaRDD.of(preppedRecords), + partitionFileIdPairsOpt, initialCall); + HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses())); + return postWrite(resultRDD, instantTime, table); + } + + /** + * Complete changes performed at the given instantTime marker with specified action. + */ + @Override + public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, Review Comment: based on f2f discussion, this may not be feasible or might lead to unintentional dag triggers which we wanted to avoid. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
