umehrot2 commented on a change in pull request #2208:
URL: https://github.com/apache/hudi/pull/2208#discussion_r516418986



##########
File path: packaging/hudi-utilities-bundle/pom.xml
##########
@@ -105,6 +107,7 @@
                   <include>io.prometheus:simpleclient_common</include>
                   <include>com.yammer.metrics:metrics-core</include>
                   
<include>org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version}</include>
+                  
<include>org.apache.spark:spark-token-provider-kafka-0-10_${scala.binary.version}</include>

Review comment:
       Seems like its possible that `scala.binary.version` is `2.11` when 
compiling by default and it can conflict here because spark 3 only uses `2.12` 
? We should probably override scala versions as well by default with the spark3 
maven profile so that such scenarios do not happen.

##########
File path: pom.xml
##########
@@ -100,6 +104,7 @@
     <prometheus.version>0.8.0</prometheus.version>
     <http.version>4.4.1</http.version>
     <spark.version>2.4.4</spark.version>
+    <spark2.version>2.4.4</spark2.version>

Review comment:
       If any dependency or property is configured in both parent and child 
POMs with different values then the child POM value will take the priority. 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
##########
@@ -173,4 +176,17 @@ public static InternalRow getInternalRowWithError(String 
partitionPath) {
         .withBulkInsertParallelism(2);
   }
 
+  private static InternalRow serializeRow(ExpressionEncoder encoder, Row row)
+      throws InvocationTargetException, IllegalAccessException, 
NoSuchMethodException, ClassNotFoundException {
+    // TODO remove reflection if Spark 2.x support is dropped
+    if (package$.MODULE$.SPARK_VERSION().startsWith("2.")) {
+      Method spark2method = encoder.getClass().getMethod("toRow", 
Object.class);
+      return (InternalRow) spark2method.invoke(encoder, row);

Review comment:
       It might make sense to create `Spark2RowSerializer` and 
`Spark3RowSerializer` similar to the implementations we have created for 
deserializers.

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -96,4 +99,13 @@ object AvroConversionUtils {
     val name = HoodieAvroUtils.sanitizeName(tableName)
     (s"${name}_record", s"hoodie.${name}")
   }
+
+  def createDeserializer(encoder: ExpressionEncoder[Row]): 
SparkRowDeserializer = {

Review comment:
       I think `HoodieSparkUtils` is a more appropriate place for this function.

##########
File path: 
hudi-spark2/src/main/scala/org/apache/hudi/DataSourceOptionsForSpark2.scala
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+
+/**
+  * Options supported for writing hoodie tables.
+  * TODO: This file is partially copied from 
org.apache.hudi.DataSourceWriteOptions.
+  * Should be removed if Spark 2.x support is dropped.
+  */

Review comment:
       The javadoc formatting is off at various places in this class.

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -121,6 +122,9 @@ private[hudi] object HoodieSparkSqlWriter {
       // short-circuit if bulk_insert via row is enabled.
       // scalastyle:off
       if (parameters(ENABLE_ROW_WRITER_OPT_KEY).toBoolean) {
+        if (SPARK_VERSION.startsWith("3.")) {
+          throw new HoodieException("Bulk insert via row is not compatible 
with Spark 3, it is only compatible with Spark 2!")
+        }

Review comment:
       Yeah. Anyways I think this message can changed to: `Bulk insert using 
row writer is not supported with Spark 3. To use row writer switch to spark 2.`.

##########
File path: pom.xml
##########
@@ -1318,6 +1325,23 @@
         </plugins>
       </build>
     </profile>
+
+    <profile>
+      <id>spark3</id>
+      <properties>
+        <spark.version>${spark3.version}</spark.version>

Review comment:
       override scala versions here ?

##########
File path: hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
##########
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.internal;
 
-import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.DataSourceUtilsForSpark2;

Review comment:
       I had misunderstood that you moved `DefaultSource.scala` which is the 
main datasource implementation. But seems like you have moved the internal 
datasource implementation used for bulk insert v2. So it seems fine to me.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
##########
@@ -85,7 +85,7 @@
         .collect(Collectors.toList()));
 
     return inputPaths.stream().map(path -> {
-      setInputPath(jobConf, path);
+      FileInputFormat.setInputPaths(jobConf, path);

Review comment:
       Me and discussed discussed it internally and this is not a concern 
anymore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to