nsivabalan commented on code in PR #8978:
URL: https://github.com/apache/hudi/pull/8978#discussion_r1237826497
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -83,6 +83,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
*/
private final transient StructType schema;
+ private boolean isDeleted;
Review Comment:
can you add java docs as to how to deduce isDeleted
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java:
##########
@@ -105,6 +106,11 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>>
delete(HoodieEngineContext c
return new
SparkDeleteDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context,
config, this, instantTime, keys).execute();
}
+ @Override
+ public HoodieWriteMetadata<HoodieData<WriteStatus>>
deletePrepped(HoodieEngineContext context, String instantTime,
HoodieData<HoodieRecord<T>> preppedRecords) {
Review Comment:
we don't need to explicitly override here if we use
SparkDeletePreppedCommitActionExecutor for MOR as well
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkDeletePreppedDeltaCommitActionExecutor.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+public class SparkDeletePreppedDeltaCommitActionExecutor<T>
+ extends BaseSparkDeltaCommitActionExecutor<T> {
+
+ private final HoodieData<HoodieRecord<T>> preppedRecords;
+
+ public SparkDeletePreppedDeltaCommitActionExecutor(HoodieSparkEngineContext
context,
Review Comment:
Why do we need this class? can't we use
SparkDeletePreppedCommitActionExecutor for both COW and MOR table?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala:
##########
@@ -38,15 +39,14 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable)
extends HoodieLeafRunn
logInfo(s"Executing 'DELETE FROM' command for $tableId")
val condition = sparkAdapter.extractDeleteCondition(dft)
-
- val targetLogicalPlan = stripMetaFieldAttributes(dft.table)
val filteredPlan = if (condition != null) {
- Filter(condition, targetLogicalPlan)
+ Filter(condition, dft.table)
} else {
- targetLogicalPlan
+ dft.table
}
- val config = buildHoodieDeleteTableConfig(catalogTable, sparkSession)
+ var config = buildHoodieDeleteTableConfig(catalogTable, sparkSession)
Review Comment:
can we avoid var and stick w/ val
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala:
##########
@@ -194,7 +192,7 @@ object HoodieCreateRecordUtils {
true
}
- private def validateMetaFieldsInAvroRecords(avroRec: GenericRecord): Unit = {
+ def validateMetaFieldsInAvroRecords(avroRec: GenericRecord): Unit = {
Review Comment:
why removed private access?
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java:
##########
@@ -153,6 +154,27 @@ public HoodieWriteMetadata<List<WriteStatus>> delete(
return new FlinkDeleteCommitActionExecutor<>(context, writeHandle, config,
this, instantTime, keys).execute();
}
+ /**
+ * Delete the given prepared records from the Hoodie table, at the supplied
instantTime.
+ *
+ * <p>This implementation requires that the input records are already
tagged, and de-duped if needed.
+ *
+ * <p>Specifies the write handle explicitly in order to have fine-grained
control with
+ * the underneath file.
+ *
+ * @param context {@link HoodieEngineContext}
+ * @param instantTime Instant Time for the action
+ * @param preppedRecords Hoodie records to delete
+ * @return {@link HoodieWriteMetadata}
+ */
+ public HoodieWriteMetadata<List<WriteStatus>> deletePrepped(
Review Comment:
why we have 2 deletePrepped methods?
##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java:
##########
@@ -41,13 +42,30 @@ public Option<Pair<HoodieRecord, Schema>>
merge(HoodieRecord older, Schema oldSc
ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecordType.SPARK);
ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecordType.SPARK);
- if (newer.getData() == null) {
- // Delete record
- return Option.empty();
+ if (newer instanceof HoodieSparkRecord) {
+ HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
+ if (newSparkRecord.isDeleted()) {
+ // Delete record
+ return Option.empty();
+ }
+ } else {
+ if (newer.getData() == null) {
Review Comment:
newer cannot be any other instance than HoodieSparkRecord since we are
within HoodieSparkRecordMerger.
So, we can ignore this.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -254,11 +254,26 @@ object HoodieSparkSqlWriter {
val (writeResult, writeClient: SparkRDDWriteClient[_]) =
operation match {
- case WriteOperationType.DELETE =>
+ case WriteOperationType.DELETE | WriteOperationType.DELETE_PREPPED =>
val genericRecords = HoodieSparkUtils.createRdd(df,
avroRecordName, avroRecordNamespace)
// Convert to RDD[HoodieKey]
- val keyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(hoodieConfig.getProps))
- val hoodieKeysToDelete = genericRecords.map(gr =>
keyGenerator.getKey(gr)).toJavaRDD()
+ val isPrepped =
hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false)
+ val keyGenerator: Option[BaseKeyGenerator] = if (isPrepped) {
+ None
+ } else {
+ Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(hoodieConfig.getProps))
+ .asInstanceOf[BaseKeyGenerator])
+ }
+
+ var validatePreppedRecord = true
Review Comment:
actually this is accessed by all tasks, but the actual variable is in the
driver and we are making updates to the variable. we should make this atomic
variable. Thinking if this is really required or we can remove the validation
only.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]