umehrot2 commented on a change in pull request #2208:
URL: https://github.com/apache/hudi/pull/2208#discussion_r516418986
##########
File path: packaging/hudi-utilities-bundle/pom.xml
##########
@@ -105,6 +107,7 @@
<include>io.prometheus:simpleclient_common</include>
<include>com.yammer.metrics:metrics-core</include>
<include>org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version}</include>
+
<include>org.apache.spark:spark-token-provider-kafka-0-10_${scala.binary.version}</include>
Review comment:
Seems like its possible that `scala.binary.version` is `2.11` when
compiling by default and it can conflict here because spark 3 only uses `2.12`
? We should probably override scala versions as well by default with the spark3
maven profile so that such scenarios do not happen.
##########
File path: pom.xml
##########
@@ -100,6 +104,7 @@
<prometheus.version>0.8.0</prometheus.version>
<http.version>4.4.1</http.version>
<spark.version>2.4.4</spark.version>
+ <spark2.version>2.4.4</spark2.version>
Review comment:
If any dependency or property is configured in both parent and child
POMs with different values then the child POM value will take the priority.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
##########
@@ -173,4 +176,17 @@ public static InternalRow getInternalRowWithError(String
partitionPath) {
.withBulkInsertParallelism(2);
}
+ private static InternalRow serializeRow(ExpressionEncoder encoder, Row row)
+ throws InvocationTargetException, IllegalAccessException,
NoSuchMethodException, ClassNotFoundException {
+ // TODO remove reflection if Spark 2.x support is dropped
+ if (package$.MODULE$.SPARK_VERSION().startsWith("2.")) {
+ Method spark2method = encoder.getClass().getMethod("toRow",
Object.class);
+ return (InternalRow) spark2method.invoke(encoder, row);
Review comment:
It might make sense to create `Spark2RowSerializer` and
`Spark3RowSerializer` similar to the implementations we have created for
deserializers.
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -96,4 +99,13 @@ object AvroConversionUtils {
val name = HoodieAvroUtils.sanitizeName(tableName)
(s"${name}_record", s"hoodie.${name}")
}
+
+ def createDeserializer(encoder: ExpressionEncoder[Row]):
SparkRowDeserializer = {
Review comment:
I think `HoodieSparkUtils` is a more appropriate place for this function.
##########
File path:
hudi-spark2/src/main/scala/org/apache/hudi/DataSourceOptionsForSpark2.scala
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+
+/**
+ * Options supported for writing hoodie tables.
+ * TODO: This file is partially copied from
org.apache.hudi.DataSourceWriteOptions.
+ * Should be removed if Spark 2.x support is dropped.
+ */
Review comment:
The javadoc formatting is off at various places in this class.
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -121,6 +122,9 @@ private[hudi] object HoodieSparkSqlWriter {
// short-circuit if bulk_insert via row is enabled.
// scalastyle:off
if (parameters(ENABLE_ROW_WRITER_OPT_KEY).toBoolean) {
+ if (SPARK_VERSION.startsWith("3.")) {
+ throw new HoodieException("Bulk insert via row is not compatible
with Spark 3, it is only compatible with Spark 2!")
+ }
Review comment:
Yeah. Anyways I think this message can changed to: `Bulk insert using
row writer is not supported with Spark 3. To use row writer switch to spark 2.`.
##########
File path: pom.xml
##########
@@ -1318,6 +1325,23 @@
</plugins>
</build>
</profile>
+
+ <profile>
+ <id>spark3</id>
+ <properties>
+ <spark.version>${spark3.version}</spark.version>
Review comment:
override scala versions here ?
##########
File path: hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
##########
@@ -18,7 +18,7 @@
package org.apache.hudi.internal;
-import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.DataSourceUtilsForSpark2;
Review comment:
I had misunderstood that you moved `DefaultSource.scala` which is the
main datasource implementation. But seems like you have moved the internal
datasource implementation used for bulk insert v2. So it seems fine to me.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
##########
@@ -85,7 +85,7 @@
.collect(Collectors.toList()));
return inputPaths.stream().map(path -> {
- setInputPath(jobConf, path);
+ FileInputFormat.setInputPaths(jobConf, path);
Review comment:
Me and discussed discussed it internally and this is not a concern
anymore.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]