[
https://issues.apache.org/jira/browse/HUDI-1105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17375301#comment-17375301
]
ASF GitHub Bot commented on HUDI-1105:
--------------------------------------
vinothchandar commented on a change in pull request #2206:
URL: https://github.com/apache/hudi/pull/2206#discussion_r664284205
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -98,6 +98,8 @@
public static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true";
public static final String COMBINE_BEFORE_DELETE_PROP =
"hoodie.combine.before.delete";
public static final String DEFAULT_COMBINE_BEFORE_DELETE = "true";
+ public static final String COMBINE_BEFORE_BULK_INSERT_PROP =
"hoodie.combine.before.bulk.insert";
Review comment:
all these configs need to be redone based ConfigProperty/HoodieConfig
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/SparkRowWriteHelper.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
+/**
+ * Helper class to assist in deduplicating Rows for BulkInsert with Rows.
+ */
+public class SparkRowWriteHelper {
+
+ private SparkRowWriteHelper() {
+ }
+
+ private static class WriteHelperHolder {
+
+ private static final SparkRowWriteHelper SPARK_WRITE_HELPER = new
SparkRowWriteHelper();
Review comment:
why the singleton etc? Can't we just use a static method?
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/SparkRowWriteHelper.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
+/**
+ * Helper class to assist in deduplicating Rows for BulkInsert with Rows.
+ */
+public class SparkRowWriteHelper {
+
+ private SparkRowWriteHelper() {
+ }
+
+ private static class WriteHelperHolder {
+
Review comment:
nit: extra line?
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java
##########
@@ -96,9 +97,15 @@
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.FILENAME_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType));
+
+ Dataset<Row> dedupedDf = rowDatasetWithHoodieColumns;
+ if (config.shouldCombineBeforeBulkInsert()) {
Review comment:
I understand that the new config is just used here as of this PR. but
from an user standpoint, on the non-row writer path, combine.before.insert was
controlling this already. We should just make it consistent.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -306,6 +308,10 @@ public boolean shouldCombineBeforeInsert() {
return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_INSERT_PROP));
}
+ public boolean shouldCombineBeforeBulkInsert() {
Review comment:
So far, we have used one config `combine.before.insert` to control it
for both insert and bulk_insert. Can we keep it the same way? Otherwise, wont
it be backwards incompatible, ie a user can be expecting the
combine.before.insert continue to take effect for bulk_insert as well and it
won't be the case?
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/SparkRowWriteHelper.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
+/**
+ * Helper class to assist in deduplicating Rows for BulkInsert with Rows.
+ */
+public class SparkRowWriteHelper {
+
+ private SparkRowWriteHelper() {
+ }
+
+ private static class WriteHelperHolder {
+
+ private static final SparkRowWriteHelper SPARK_WRITE_HELPER = new
SparkRowWriteHelper();
+ }
+
+ public static SparkRowWriteHelper newInstance() {
+ return SparkRowWriteHelper.WriteHelperHolder.SPARK_WRITE_HELPER;
+ }
+
+ public Dataset<Row> deduplicateRows(Dataset<Row> inputDf, String
preCombineField, boolean isGlobalIndex) {
+ ExpressionEncoder encoder = getEncoder(inputDf.schema());
+
+ return inputDf.groupByKey(
+ (MapFunction<Row, String>) value ->
+ isGlobalIndex ?
(value.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)) :
+ (value.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+"
+ value.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)), Encoders.STRING())
+ .reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
+ if (((Comparable) v1.getAs(preCombineField)).compareTo(((Comparable)
v2.getAs(preCombineField))) >= 0) {
+ return v1;
+ } else {
+ return v2;
+ }
+ }
+ ).map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2,
encoder);
+ }
+
+ private ExpressionEncoder getEncoder(StructType schema) {
+ List<Attribute> attributes =
JavaConversions.asJavaCollection(schema.toAttributes()).stream()
+ .map(Attribute::toAttribute).collect(Collectors.toList());
+ return RowEncoder.apply(schema)
Review comment:
have you tested with both Spark 2 and 3? Some of these classes can be
different and actually fail?
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/SparkRowWriteHelper.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
+/**
+ * Helper class to assist in deduplicating Rows for BulkInsert with Rows.
+ */
+public class SparkRowWriteHelper {
+
+ private SparkRowWriteHelper() {
+ }
+
+ private static class WriteHelperHolder {
+
+ private static final SparkRowWriteHelper SPARK_WRITE_HELPER = new
SparkRowWriteHelper();
+ }
+
+ public static SparkRowWriteHelper newInstance() {
+ return SparkRowWriteHelper.WriteHelperHolder.SPARK_WRITE_HELPER;
+ }
+
+ public Dataset<Row> deduplicateRows(Dataset<Row> inputDf, String
preCombineField, boolean isGlobalIndex) {
+ ExpressionEncoder encoder = getEncoder(inputDf.schema());
+
+ return inputDf.groupByKey(
Review comment:
lets use reduceByKey(), which we use for RDD path? groupByKey() can hog
memory.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -333,7 +334,9 @@ private[hudi] object HoodieSparkSqlWriter {
log.info(s"Registered avro schema : ${schema.toString(true)}")
val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA,
schema.toString)
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString,
path.get, tblName, mapAsJavaMap(params))
- val hoodieDF =
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext,
writeConfig, df, structName, nameSpace)
+ val isGlobalIndex = SparkHoodieIndex.createIndex(writeConfig).isGlobal
Review comment:
do we need to create the index to really check if its Global? Wondering
if there are simpler means. (may be not, given we support even user defined
indexes)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Bulk insert dataset - Dedup
> ---------------------------
>
> Key: HUDI-1105
> URL: https://issues.apache.org/jira/browse/HUDI-1105
> Project: Apache Hudi
> Issue Type: Sub-task
> Components: Writer Core
> Affects Versions: 0.9.0
> Reporter: sivabalan narayanan
> Assignee: sivabalan narayanan
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 0.9.0
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)