This is an automated email from the ASF dual-hosted git repository.

uditme pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fce1453  [HUDI-1040] Make Hudi support Spark 3 (#2208)
fce1453 is described below

commit fce1453fa608fcff5df4d5aca8c88107d4151b09
Author: wenningd <[email protected]>
AuthorDate: Wed Dec 9 15:52:23 2020 -0800

    [HUDI-1040] Make Hudi support Spark 3 (#2208)
    
    * Fix flaky MOR unit test
    
    * Update Spark APIs to make it be compatible with both spark2 & spark3
    
    * Refactor bulk insert v2 part to make Hudi be able to compile with Spark3
    
    * Add spark3 profile to handle fasterxml & spark version
    
    * Create hudi-spark-common module & refactor hudi-spark related modules
    
    Co-authored-by: Wenning Ding <[email protected]>
---
 LICENSE                                            |   2 +
 README.md                                          |   8 +
 .../hudi/client/utils/SparkRowDeserializer.java    |  14 +-
 .../org/apache/hudi/AvroConversionUtils.scala      |  28 +--
 .../apache/hudi/io/TestHoodieRowCreateHandle.java  |   7 +-
 .../TestHoodieInternalRowParquetWriter.java        |   3 +-
 .../hudi/testutils/HoodieMergeOnReadTestUtils.java |  59 +++---
 .../hudi/testutils/SparkDatasetTestUtils.java      |  20 +-
 .../view/RemoteHoodieTableFileSystemView.java      |   2 +-
 hudi-integ-test/pom.xml                            |   8 +-
 .../integ/testsuite/reader/SparkBasedReader.java   |   6 +-
 .../java/org/apache/hudi/integ/ITTestBase.java     |   6 +-
 hudi-spark-datasource/hudi-spark-common/pom.xml    | 178 +++++++++++++++++
 .../main/java/org/apache/hudi/DataSourceUtils.java |   0
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   4 +-
 .../hudi-spark}/pom.xml                            |  22 ++-
 .../hudi-spark}/run_hoodie_app.sh                  |   2 +-
 .../hudi-spark}/run_hoodie_generate_app.sh         |   2 +-
 .../hudi-spark}/run_hoodie_streaming_app.sh        |   2 +-
 .../org/apache/hudi/HoodieDataSourceHelpers.java   |   0
 .../apache/hudi/HoodieDatasetBulkInsertHelper.java |   0
 .../main/java/org/apache/hudi/QuickstartUtils.java |   0
 .../async/SparkStreamingAsyncCompactService.java   |   0
 .../SparkParquetBootstrapDataProvider.java         |   4 +-
 .../exception/HoodieDeltaStreamerException.java    |   0
 .../org/apache/hudi/payload/AWSDmsAvroPayload.java |   0
 ...org.apache.spark.sql.sources.DataSourceRegister |   0
 .../main/scala/org/apache/hudi/DefaultSource.scala |   0
 .../scala/org/apache/hudi/HoodieBootstrapRDD.scala |   0
 .../org/apache/hudi/HoodieBootstrapRelation.scala  |   0
 .../org/apache/hudi/HoodieEmptyRelation.scala      |   0
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     |   0
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   8 +-
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   | 121 ++++++++++++
 .../org/apache/hudi/HoodieStreamingSink.scala      |   0
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |   0
 .../org/apache/hudi/IncrementalRelation.scala      |   0
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |   3 -
 .../src/main/scala/org/apache/hudi/package.scala   |   0
 .../hudi-spark}/src/test/java/HoodieJavaApp.java   |   0
 .../src/test/java/HoodieJavaGenerateApp.java       |   0
 .../src/test/java/HoodieJavaStreamingApp.java      |   4 +-
 .../java/org/apache/hudi/TestDataSourceUtils.java  |   0
 .../hudi/TestHoodieDatasetBulkInsertHelper.java    |   0
 .../java/org/apache/hudi/client/TestBootstrap.java |   0
 .../apache/hudi/payload/TestAWSDmsAvroPayload.java |   0
 .../apache/hudi/testutils/DataSourceTestUtils.java |   0
 .../src/test/resources/exampleSchema.txt           |   0
 .../test/resources/log4j-surefire-quiet.properties |   0
 .../src/test/resources/log4j-surefire.properties   |   0
 .../org/apache/hudi/TestAvroConversionHelper.scala |   0
 .../org/apache/hudi/TestDataSourceDefaults.scala   |   0
 .../org/apache/hudi/TestHoodieSparkUtils.scala     |   0
 .../functional/HoodieSparkSqlWriterSuite.scala     |   0
 .../apache/hudi/functional/TestCOWDataSource.scala |   0
 .../functional/TestDataSourceForBootstrap.scala    |   0
 .../apache/hudi/functional/TestMORDataSource.scala |   0
 .../hudi/functional/TestStructuredStreaming.scala  |   4 +-
 .../hudi-spark2}/pom.xml                           | 220 ++-------------------
 .../org/apache/hudi/internal/DefaultSource.java    |   0
 .../HoodieBulkInsertDataInternalWriter.java        |   0
 .../HoodieBulkInsertDataInternalWriterFactory.java |   0
 .../internal/HoodieDataSourceInternalWriter.java   |   0
 .../hudi/internal/HoodieWriterCommitMessage.java   |   0
 .../org/apache/hudi/Spark2RowDeserializer.scala    |  24 +--
 .../TestHoodieBulkInsertDataInternalWriter.java    |  12 +-
 .../TestHoodieDataSourceInternalWriter.java        |  15 +-
 hudi-spark-datasource/hudi-spark3/pom.xml          | 163 +++++++++++++++
 .../org/apache/hudi/Spark3RowDeserializer.scala    |  25 +--
 hudi-spark-datasource/pom.xml                      |  39 ++++
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |  50 -----
 hudi-utilities/pom.xml                             |  15 ++
 .../org/apache/hudi/utilities/UtilHelpers.java     |   4 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   6 +-
 .../deltastreamer/SourceFormatAdapter.java         |   5 +-
 packaging/hudi-integ-test-bundle/pom.xml           |  14 ++
 packaging/hudi-spark-bundle/pom.xml                |  18 ++
 packaging/hudi-utilities-bundle/pom.xml            |  19 ++
 pom.xml                                            |  33 +++-
 79 files changed, 766 insertions(+), 413 deletions(-)

diff --git a/LICENSE b/LICENSE
index 1e21747..385191d 100644
--- a/LICENSE
+++ b/LICENSE
@@ -246,6 +246,8 @@ This product includes code from Apache Spark
 
 * org.apache.hudi.AvroConversionHelper copied from classes in 
org/apache/spark/sql/avro package
 
+* org.apache.hudi.HoodieSparkUtils.scala copied some methods from 
org.apache.spark.deploy.SparkHadoopUtil.scala
+
 Copyright: 2014 and onwards The Apache Software Foundation
 Home page: http://spark.apache.org/
 License: http://www.apache.org/licenses/LICENSE-2.0
diff --git a/README.md b/README.md
index 2416b3f..427d859 100644
--- a/README.md
+++ b/README.md
@@ -76,6 +76,14 @@ The default Scala version supported is 2.11. To build for 
Scala 2.12 version, bu
 mvn clean package -DskipTests -Dscala-2.12
 ```
 
+### Build with Spark 3.0.0
+
+The default Spark version supported is 2.4.4. To build for Spark 3.0.0 
version, build using `spark3` profile
+
+```
+mvn clean package -DskipTests -Dspark3
+```
+
 ### Build without spark-avro module
 
 The default hudi-jar bundles spark-avro module. To build without spark-avro 
module, build using `spark-shade-unbundle-avro` profile
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowDeserializer.java
similarity index 75%
copy from 
hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java
copy to 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowDeserializer.java
index e939d62..66b8b78 100644
--- 
a/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowDeserializer.java
@@ -16,15 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.exception;
+package org.apache.hudi.client.utils;
 
-public class HoodieDeltaStreamerException extends HoodieException {
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
 
-  public HoodieDeltaStreamerException(String msg, Throwable e) {
-    super(msg, e);
-  }
+import java.io.Serializable;
 
-  public HoodieDeltaStreamerException(String msg) {
-    super(msg);
-  }
+public interface SparkRowDeserializer extends Serializable {
+  Row deserializeRow(InternalRow internalRow);
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index d1a4249..8810126 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -21,41 +21,15 @@ package org.apache.hudi
 import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, 
IndexedRecord}
 import org.apache.hudi.avro.HoodieAvroUtils
-import org.apache.hudi.common.model.HoodieKey
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.avro.SchemaConverters
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
 
 import scala.collection.JavaConverters._
 
 object AvroConversionUtils {
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): 
RDD[GenericRecord] = {
-    val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, 
recordNamespace)
-    createRdd(df, avroSchema, structName, recordNamespace)
-  }
-
-  def createRdd(df: DataFrame, avroSchema: Schema, structName: String, 
recordNamespace: String)
-  : RDD[GenericRecord] = {
-    // Use the Avro schema to derive the StructType which has the correct 
nullability information
-    val dataType = 
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
-    val encoder = RowEncoder.apply(dataType).resolveAndBind()
-    df.queryExecution.toRdd.map(encoder.fromRow)
-      .mapPartitions { records =>
-        if (records.isEmpty) Iterator.empty
-        else {
-          val convertor = AvroConversionHelper.createConverterToAvro(dataType, 
structName, recordNamespace)
-          records.map { x => convertor(x).asInstanceOf[GenericRecord] }
-        }
-      }
-  }
-
-  def createRddForDeletes(df: DataFrame, rowField: String, partitionField: 
String): RDD[HoodieKey] = {
-    df.rdd.map(row => new HoodieKey(row.getAs[String](rowField), 
row.getAs[String](partitionField)))
-  }
-
   def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss: 
SparkSession): Dataset[Row] = {
     if (rdd.isEmpty()) {
       ss.emptyDataFrame
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieRowCreateHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieRowCreateHandle.java
index c7a313a..edce777 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieRowCreateHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieRowCreateHandle.java
@@ -72,7 +72,7 @@ public class TestHoodieRowCreateHandle extends 
HoodieClientTestHarness {
   }
 
   @Test
-  public void testRowCreateHandle() throws IOException {
+  public void testRowCreateHandle() throws Exception {
     // init config and table
     HoodieWriteConfig cfg = 
SparkDatasetTestUtils.getConfigBuilder(basePath).build();
     HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
@@ -113,7 +113,7 @@ public class TestHoodieRowCreateHandle extends 
HoodieClientTestHarness {
    * should be thrown.
    */
   @Test
-  public void testGlobalFailure() throws IOException {
+  public void testGlobalFailure() throws Exception {
     // init config and table
     HoodieWriteConfig cfg = 
SparkDatasetTestUtils.getConfigBuilder(basePath).build();
     HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
@@ -179,7 +179,8 @@ public class TestHoodieRowCreateHandle extends 
HoodieClientTestHarness {
     }
   }
 
-  private HoodieInternalWriteStatus writeAndGetWriteStatus(Dataset<Row> 
inputRows, HoodieRowCreateHandle handle) throws IOException {
+  private HoodieInternalWriteStatus writeAndGetWriteStatus(Dataset<Row> 
inputRows, HoodieRowCreateHandle handle)
+      throws Exception {
     List<InternalRow> internalRows = 
SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER);
     // issue writes
     for (InternalRow internalRow : internalRows) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieInternalRowParquetWriter.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieInternalRowParquetWriter.java
index 37b8cdc..2b344db 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieInternalRowParquetWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieInternalRowParquetWriter.java
@@ -35,7 +35,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -64,7 +63,7 @@ public class TestHoodieInternalRowParquetWriter extends 
HoodieClientTestHarness
   }
 
   @Test
-  public void endToEndTest() throws IOException {
+  public void endToEndTest() throws Exception {
     HoodieWriteConfig cfg = 
SparkDatasetTestUtils.getConfigBuilder(basePath).build();
     for (int i = 0; i < 5; i++) {
       // init write support and parquet config
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
index 8104ef7..5633551 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.mapred.RecordReader;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -84,36 +83,32 @@ public class HoodieMergeOnReadTestUtils {
         .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), 
f.defaultVal()))
         .collect(Collectors.toList()));
 
-    return inputPaths.stream().map(path -> {
-      setInputPath(jobConf, path);
-      List<GenericRecord> records = new ArrayList<>();
-      try {
-        List<InputSplit> splits = Arrays.asList(inputFormat.getSplits(jobConf, 
1));
-        for (InputSplit split : splits) {
-          RecordReader recordReader = inputFormat.getRecordReader(split, 
jobConf, null);
-          Object key = recordReader.createKey();
-          ArrayWritable writable = (ArrayWritable) recordReader.createValue();
-          while (recordReader.next(key, writable)) {
-            GenericRecordBuilder newRecord = new 
GenericRecordBuilder(projectedSchema);
-            // writable returns an array with [field1, field2, 
_hoodie_commit_time,
-            // _hoodie_commit_seqno]
-            Writable[] values = writable.get();
-            schema.getFields().stream()
-                .filter(f -> !projectCols || 
projectedColumns.contains(f.name()))
-                .map(f -> Pair.of(projectedSchema.getFields().stream()
-                    .filter(p -> f.name().equals(p.name())).findFirst().get(), 
f))
-                .forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), 
values[fieldsPair.getValue().pos()]));
-            records.add(newRecord.build());
-          }
+    List<GenericRecord> records = new ArrayList<>();
+    try {
+      FileInputFormat.setInputPaths(jobConf, String.join(",", inputPaths));
+      InputSplit[] splits = inputFormat.getSplits(jobConf, inputPaths.size());
+
+      for (InputSplit split : splits) {
+        RecordReader recordReader = inputFormat.getRecordReader(split, 
jobConf, null);
+        Object key = recordReader.createKey();
+        ArrayWritable writable = (ArrayWritable) recordReader.createValue();
+        while (recordReader.next(key, writable)) {
+          GenericRecordBuilder newRecord = new 
GenericRecordBuilder(projectedSchema);
+          // writable returns an array with [field1, field2, 
_hoodie_commit_time,
+          // _hoodie_commit_seqno]
+          Writable[] values = writable.get();
+          schema.getFields().stream()
+              .filter(f -> !projectCols || projectedColumns.contains(f.name()))
+              .map(f -> Pair.of(projectedSchema.getFields().stream()
+                  .filter(p -> f.name().equals(p.name())).findFirst().get(), 
f))
+              .forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), 
values[fieldsPair.getValue().pos()]));
+          records.add(newRecord.build());
         }
-      } catch (IOException ie) {
-        ie.printStackTrace();
       }
-      return records;
-    }).reduce((a, b) -> {
-      a.addAll(b);
-      return a;
-    }).orElse(new ArrayList<>());
+    } catch (IOException ie) {
+      ie.printStackTrace();
+    }
+    return records;
   }
 
   private static void setPropsForInputFormat(FileInputFormat inputFormat, 
JobConf jobConf, Schema schema, String hiveColumnTypes, boolean projectCols, 
List<String> projectedCols) {
@@ -156,10 +151,4 @@ public class HoodieMergeOnReadTestUtils {
     configurable.setConf(conf);
     jobConf.addResource(conf);
   }
-
-  private static void setInputPath(JobConf jobConf, String inputPath) {
-    jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
-    jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
-    jobConf.set("map.input.dir", inputPath);
-  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
index 04a1712..3d2019d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 
+import org.apache.spark.package$;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
@@ -41,6 +42,8 @@ import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -139,11 +142,11 @@ public class SparkDatasetTestUtils {
    * @param rows Dataset<Row>s to be converted
    * @return the List of {@link InternalRow}s thus converted.
    */
-  public static List<InternalRow> toInternalRows(Dataset<Row> rows, 
ExpressionEncoder encoder) {
+  public static List<InternalRow> toInternalRows(Dataset<Row> rows, 
ExpressionEncoder encoder) throws Exception {
     List<InternalRow> toReturn = new ArrayList<>();
     List<Row> rowList = rows.collectAsList();
     for (Row row : rowList) {
-      toReturn.add(encoder.toRow(row).copy());
+      toReturn.add(serializeRow(encoder, row).copy());
     }
     return toReturn;
   }
@@ -173,4 +176,17 @@ public class SparkDatasetTestUtils {
         .withBulkInsertParallelism(2);
   }
 
+  private static InternalRow serializeRow(ExpressionEncoder encoder, Row row)
+      throws InvocationTargetException, IllegalAccessException, 
NoSuchMethodException, ClassNotFoundException {
+    // TODO remove reflection if Spark 2.x support is dropped
+    if (package$.MODULE$.SPARK_VERSION().startsWith("2.")) {
+      Method spark2method = encoder.getClass().getMethod("toRow", 
Object.class);
+      return (InternalRow) spark2method.invoke(encoder, row);
+    } else {
+      Class<?> serializerClass = 
Class.forName("org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer");
+      Object serializer = 
encoder.getClass().getMethod("createSerializer").invoke(encoder);
+      Method aboveSpark2method = serializerClass.getMethod("apply", 
Object.class);
+      return (InternalRow) aboveSpark2method.invoke(serializer, row);
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index d42592b..91a28a8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -171,7 +171,7 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
         break;
     }
     String content = response.returnContent().asString();
-    return mapper.readValue(content, reference);
+    return (T) mapper.readValue(content, reference);
   }
 
   private Map<String, String> getParamsWithPartitionPath(String partitionPath) 
{
diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml
index e9fcc61..90ca94d 100644
--- a/hudi-integ-test/pom.xml
+++ b/hudi-integ-test/pom.xml
@@ -206,12 +206,11 @@
     <dependency>
       <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-yaml</artifactId>
-      <version>2.7.4</version>
+      <version>${fasterxml.jackson.dataformat.yaml.version}</version>
     </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
-      <version>2.6.7.3</version>
     </dependency>
 
     <!-- Fasterxml - Test-->
@@ -221,11 +220,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>com.fasterxml.jackson.datatype</groupId>
       <artifactId>jackson-datatype-guava</artifactId>
       <scope>test</scope>
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java
index 16a5259..fc23a47 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java
@@ -20,7 +20,7 @@ package org.apache.hudi.integ.testsuite.reader;
 
 import java.util.List;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
 import org.apache.spark.api.java.JavaRDD;
@@ -49,7 +49,7 @@ public class SparkBasedReader {
         .option(AVRO_SCHEMA_OPTION_KEY, schemaStr)
         
.load(JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq());
 
-    return AvroConversionUtils
+    return HoodieSparkUtils
         .createRdd(dataSet.toDF(), 
structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
             nameSpace.orElse(RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE))
         .toJavaRDD();
@@ -61,7 +61,7 @@ public class SparkBasedReader {
     Dataset<Row> dataSet = sparkSession.read()
         
.parquet((JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq()));
 
-    return AvroConversionUtils
+    return HoodieSparkUtils
         .createRdd(dataSet.toDF(), 
structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
             RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE)
         .toJavaRDD();
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
index 8cb64f8..80ed1d4 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
@@ -61,9 +61,9 @@ public abstract class ITTestBase {
   protected static final String HIVESERVER = "/hiveserver";
   protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1";
   protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws";
-  protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + 
"/hudi-spark/run_hoodie_app.sh";
-  protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + 
"/hudi-spark/run_hoodie_generate_app.sh";
-  protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + 
"/hudi-spark/run_hoodie_streaming_app.sh";
+  protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + 
"/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh";
+  protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + 
"/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh";
+  protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + 
"/hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh";
   protected static final String HUDI_HADOOP_BUNDLE =
       HOODIE_WS_ROOT + 
"/docker/hoodie/hadoop/hive_base/target/hoodie-hadoop-mr-bundle.jar";
   protected static final String HUDI_HIVE_SYNC_BUNDLE =
diff --git a/hudi-spark-datasource/hudi-spark-common/pom.xml 
b/hudi-spark-datasource/hudi-spark-common/pom.xml
new file mode 100644
index 0000000..af6403d
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/pom.xml
@@ -0,0 +1,178 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>hudi-spark-datasource</artifactId>
+    <groupId>org.apache.hudi</groupId>
+    <version>0.6.1-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>hudi-spark-common</artifactId>
+  <version>${parent.version}</version>
+
+  <name>hudi-spark-common</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <main.basedir>${project.parent.parent.basedir}</main.basedir>
+  </properties>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+      </resource>
+    </resources>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>net.alchim31.maven</groupId>
+          <artifactId>scala-maven-plugin</artifactId>
+          <version>${scala-maven-plugin.version}</version>
+          <configuration>
+            <args>
+              <arg>-nobootcp</arg>
+            </args>
+          </configuration>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-compiler-plugin</artifactId>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>prepare-package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+              <overWriteReleases>true</overWriteReleases>
+              <overWriteSnapshots>true</overWriteSnapshots>
+              <overWriteIfNewer>true</overWriteIfNewer>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>scala-compile-first</id>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>add-source</goal>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>scala-test-compile</id>
+            <phase>process-test-resources</phase>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>compile</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>test-compile</phase>
+          </execution>
+        </executions>
+        <configuration>
+          <skip>false</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.scalastyle</groupId>
+        <artifactId>scalastyle-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <!-- Scala -->
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+
+    <!-- Hoodie -->
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-client-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-hive-sync</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
similarity index 100%
rename from hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
rename to 
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
similarity index 99%
rename from hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
rename to 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 8a3ccf7..1b6e49b 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -36,7 +36,7 @@ import org.apache.log4j.LogManager
   */
 object DataSourceReadOptions {
 
-  private val log = LogManager.getLogger(classOf[DefaultSource])
+  private val log = LogManager.getLogger(DataSourceReadOptions.getClass)
 
   /**
     * Whether data needs to be read, in
@@ -143,7 +143,7 @@ object DataSourceReadOptions {
   */
 object DataSourceWriteOptions {
 
-  private val log = LogManager.getLogger(classOf[DefaultSource])
+  private val log = LogManager.getLogger(DataSourceWriteOptions.getClass)
 
   /**
     * The write operation, that this write should do
diff --git a/hudi-spark/pom.xml b/hudi-spark-datasource/hudi-spark/pom.xml
similarity index 94%
copy from hudi-spark/pom.xml
copy to hudi-spark-datasource/hudi-spark/pom.xml
index 0942327..65efdce 100644
--- a/hudi-spark/pom.xml
+++ b/hudi-spark-datasource/hudi-spark/pom.xml
@@ -17,17 +17,20 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <parent>
-    <artifactId>hudi</artifactId>
+    <artifactId>hudi-spark-datasource</artifactId>
     <groupId>org.apache.hudi</groupId>
     <version>0.6.1-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
   <artifactId>hudi-spark_${scala.binary.version}</artifactId>
+  <version>${parent.version}</version>
+
+  <name>hudi-spark_${scala.binary.version}</name>
   <packaging>jar</packaging>
 
   <properties>
-    <main.basedir>${project.parent.basedir}</main.basedir>
+    <main.basedir>${project.parent.parent.basedir}</main.basedir>
   </properties>
 
   <build>
@@ -196,6 +199,21 @@
       <artifactId>hudi-sync-common</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark2_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark3_2.12</artifactId>
+      <version>${project.version}</version>
+    </dependency>
 
     <!-- Logging -->
     <dependency>
diff --git a/hudi-spark/run_hoodie_app.sh 
b/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh
similarity index 93%
rename from hudi-spark/run_hoodie_app.sh
rename to hudi-spark-datasource/hudi-spark/run_hoodie_app.sh
index 7c63e74..9782aa3 100755
--- a/hudi-spark/run_hoodie_app.sh
+++ b/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh
@@ -23,7 +23,7 @@ function error_exit {
 
 DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 #Ensure we pick the right jar even for hive11 builds
-HUDI_JAR=`ls -c 
$DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v 
source | head -1`
+HUDI_JAR=`ls -c 
$DIR/../../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v 
sources | head -1`
 
 if [ -z "$HADOOP_CONF_DIR" ]; then
   echo "setting hadoop conf dir"
diff --git a/hudi-spark/run_hoodie_generate_app.sh 
b/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh
similarity index 93%
rename from hudi-spark/run_hoodie_generate_app.sh
rename to hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh
index a4b4090..a276951 100755
--- a/hudi-spark/run_hoodie_generate_app.sh
+++ b/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh
@@ -23,7 +23,7 @@ function error_exit {
 
 DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 #Ensure we pick the right jar even for hive11 builds
-HUDI_JAR=`ls -c 
$DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v 
source | head -1`
+HUDI_JAR=`ls -c 
$DIR/../../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v 
sources | head -1`
 
 if [ -z "$HADOOP_CONF_DIR" ]; then
   echo "setting hadoop conf dir"
diff --git a/hudi-spark/run_hoodie_streaming_app.sh 
b/hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh
similarity index 94%
rename from hudi-spark/run_hoodie_streaming_app.sh
rename to hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh
index 01f1a4e..9a81a4c 100755
--- a/hudi-spark/run_hoodie_streaming_app.sh
+++ b/hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh
@@ -23,7 +23,7 @@ function error_exit {
 
 DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 #Ensure we pick the right jar even for hive11 builds
-HUDI_JAR=`ls -c 
$DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v 
source | head -1`
+HUDI_JAR=`ls -c 
$DIR/../../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v 
sources | head -1`
 
 if [ -z "$HADOOP_CONF_DIR" ]; then
   echo "setting hadoop conf dir"
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
similarity index 100%
rename from 
hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
rename to 
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java
similarity index 100%
rename from 
hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java
rename to 
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java
diff --git a/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
similarity index 100%
rename from hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
rename to 
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java
similarity index 100%
rename from 
hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java
rename to 
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java
similarity index 95%
rename from 
hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java
rename to 
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java
index 022abe3..6c5eb0e 100644
--- 
a/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.bootstrap;
 
-import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
@@ -65,7 +65,7 @@ public class SparkParquetBootstrapDataProvider extends 
FullRecordBootstrapDataPr
       KeyGenerator keyGenerator = DataSourceUtils.createKeyGenerator(props);
       String structName = tableName + "_record";
       String namespace = "hoodie." + tableName;
-      RDD<GenericRecord> genericRecords = 
AvroConversionUtils.createRdd(inputDataset, structName, namespace);
+      RDD<GenericRecord> genericRecords = 
HoodieSparkUtils.createRdd(inputDataset, structName, namespace);
       return genericRecords.toJavaRDD().map(gr -> {
         String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
             gr, props.getString("hoodie.datasource.write.precombine.field"), 
false);
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java
similarity index 100%
rename from 
hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java
rename to 
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
similarity index 100%
rename from 
hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
rename to 
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
diff --git 
a/hudi-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/hudi-spark-datasource/hudi-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
similarity index 100%
rename from 
hudi-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
rename to 
hudi-spark-datasource/hudi-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
similarity index 100%
rename from hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
rename to 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
similarity index 100%
rename from hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
rename to 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
similarity index 100%
rename from 
hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
rename to 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala
similarity index 100%
rename from hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala
rename to 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
similarity index 100%
rename from hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
rename to 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
similarity index 98%
rename from hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
rename to 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index b10a05b..d661036 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -41,6 +41,7 @@ import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
 import org.apache.hudi.internal.HoodieDataSourceInternalWriter
 import org.apache.hudi.sync.common.AbstractSyncTool
 import org.apache.log4j.LogManager
+import org.apache.spark.SPARK_VERSION
 import org.apache.spark.SparkContext
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.rdd.RDD
@@ -129,6 +130,9 @@ private[hudi] object HoodieSparkSqlWriter {
       // scalastyle:off
       if (parameters(ENABLE_ROW_WRITER_OPT_KEY).toBoolean &&
         operation == WriteOperationType.BULK_INSERT) {
+        if (!SPARK_VERSION.startsWith("2.")) {
+          throw new HoodieException("Bulk insert using row writer is not 
supported with Spark 3. To use row writer please switch to spark 2.")
+        }
         val (success, commitTime: common.util.Option[String]) = 
bulkInsertAsRow(sqlContext, parameters, df, tblName,
                                                                                
 basePath, path, instantTime)
         return (success, commitTime, common.util.Option.empty(), 
hoodieWriteClient.orNull, tableConfig)
@@ -148,7 +152,7 @@ private[hudi] object HoodieSparkSqlWriter {
 
           // Convert to RDD[HoodieRecord]
           val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
-          val genericRecords: RDD[GenericRecord] = 
AvroConversionUtils.createRdd(df, schema, structName, nameSpace)
+          val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
           val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean 
|| operation.equals(WriteOperationType.UPSERT);
           val hoodieAllIncomingRecords = genericRecords.map(gr => {
             val hoodieRecord = if (shouldCombine) {
@@ -195,7 +199,7 @@ private[hudi] object HoodieSparkSqlWriter {
 
           // Convert to RDD[HoodieKey]
           val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
-          val genericRecords: RDD[GenericRecord] = 
AvroConversionUtils.createRdd(df, structName, nameSpace)
+          val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, structName, nameSpace)
           val hoodieKeysToDelete = genericRecords.map(gr => 
keyGenerator.getKey(gr)).toJavaRDD()
 
           if (!tableExists) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
new file mode 100644
index 0000000..02880f2
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.client.utils.SparkRowDeserializer
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
InMemoryFileIndex}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import scala.collection.JavaConverters._
+
+
+object HoodieSparkUtils {
+
+  def getMetaSchema: StructType = {
+    StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
+      StructField(col, StringType, nullable = true)
+    }))
+  }
+
+  /**
+   * This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
+   * [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 
3.0.0 and hence we had to copy it locally.
+   */
+  def isGlobPath(pattern: Path): Boolean = {
+    pattern.toString.exists("{}[]*?\\".toSet.contains)
+  }
+
+  /**
+   * This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
+   * [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 
3.0.0 and hence we had to copy it locally.
+   */
+  def globPath(fs: FileSystem, pattern: Path): Seq[Path] = {
+    Option(fs.globStatus(pattern)).map { statuses =>
+      statuses.map(_.getPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)).toSeq
+    }.getOrElse(Seq.empty[Path])
+  }
+
+  /**
+   * This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
+   * [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 
3.0.0 and hence we had to copy it locally.
+   */
+  def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = {
+    if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern)
+  }
+
+  /**
+   * Checks to see whether input path contains a glob pattern and if yes, maps 
it to a list of absolute paths
+   * which match the glob pattern. Otherwise, returns original path
+   *
+   * @param paths List of absolute or globbed paths
+   * @param fs    File system
+   * @return list of absolute file paths
+   */
+  def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): 
Seq[Path] = {
+    paths.flatMap(path => {
+      val qualified = new Path(path).makeQualified(fs.getUri, 
fs.getWorkingDirectory)
+      val globPaths = globPathIfNecessary(fs, qualified)
+      globPaths
+    })
+  }
+
+  def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: 
Seq[Path]): InMemoryFileIndex = {
+    val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+    new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, 
fileStatusCache)
+  }
+
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String): 
RDD[GenericRecord] = {
+    val avroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
recordNamespace)
+    createRdd(df, avroSchema, structName, recordNamespace)
+  }
+
+  def createRdd(df: DataFrame, avroSchema: Schema, structName: String, 
recordNamespace: String)
+  : RDD[GenericRecord] = {
+    // Use the Avro schema to derive the StructType which has the correct 
nullability information
+    val dataType = 
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+    val encoder = RowEncoder.apply(dataType).resolveAndBind()
+    val deserializer = HoodieSparkUtils.createDeserializer(encoder)
+    df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
+      .mapPartitions { records =>
+        if (records.isEmpty) Iterator.empty
+        else {
+          val convertor = AvroConversionHelper.createConverterToAvro(dataType, 
structName, recordNamespace)
+          records.map { x => convertor(x).asInstanceOf[GenericRecord] }
+        }
+      }
+  }
+
+  def createDeserializer(encoder: ExpressionEncoder[Row]): 
SparkRowDeserializer = {
+    // TODO remove Spark2RowDeserializer if Spark 2.x support is dropped
+    if (SPARK_VERSION.startsWith("2.")) {
+      new Spark2RowDeserializer(encoder)
+    } else {
+      new Spark3RowDeserializer(encoder)
+    }
+  }
+}
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
similarity index 100%
rename from hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
rename to 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
similarity index 100%
rename from hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
rename to 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
similarity index 100%
rename from hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
rename to 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
similarity index 97%
rename from 
hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
rename to 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index c1a6acd..0b81fa7 100644
--- 
a/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -113,9 +113,6 @@ class MergeOnReadSnapshotRelation(val sqlContext: 
SQLContext,
       hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
     )
 
-    // Follow the implementation of Spark internal HadoopRDD to handle the 
broadcast configuration.
-    FileSystem.getLocal(jobConf)
-    SparkHadoopUtil.get.addCredentials(jobConf)
     val rdd = new HoodieMergeOnReadRDD(
       sqlContext.sparkContext,
       jobConf,
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/package.scala 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/package.scala
similarity index 100%
copy from hudi-spark/src/main/scala/org/apache/hudi/package.scala
copy to 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/package.scala
diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java 
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
similarity index 100%
rename from hudi-spark/src/test/java/HoodieJavaApp.java
rename to hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
diff --git a/hudi-spark/src/test/java/HoodieJavaGenerateApp.java 
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java
similarity index 100%
rename from hudi-spark/src/test/java/HoodieJavaGenerateApp.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java
diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java 
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
similarity index 99%
rename from hudi-spark/src/test/java/HoodieJavaStreamingApp.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index 606490f..1df12a3 100644
--- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -43,7 +43,7 @@ import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.streaming.DataStreamWriter;
 import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.streaming.ProcessingTime;
+import org.apache.spark.sql.streaming.Trigger;
 
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -366,7 +366,7 @@ public class HoodieJavaStreamingApp {
         .outputMode(OutputMode.Append());
 
     updateHiveSyncConfig(writer);
-    StreamingQuery query = writer.trigger(new 
ProcessingTime(500)).start(tablePath);
+    StreamingQuery query = 
writer.trigger(Trigger.ProcessingTime(500)).start(tablePath);
     query.awaitTermination(streamingDurationInMs);
   }
 
diff --git a/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
similarity index 100%
rename from hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
diff --git 
a/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java
similarity index 100%
rename from 
hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java
diff --git a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
similarity index 100%
rename from hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
diff --git 
a/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java
similarity index 100%
rename from 
hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java
diff --git 
a/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
similarity index 100%
rename from 
hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
diff --git a/hudi-spark/src/test/resources/exampleSchema.txt 
b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt
similarity index 100%
rename from hudi-spark/src/test/resources/exampleSchema.txt
rename to hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt
diff --git a/hudi-spark/src/test/resources/log4j-surefire-quiet.properties 
b/hudi-spark-datasource/hudi-spark/src/test/resources/log4j-surefire-quiet.properties
similarity index 100%
rename from hudi-spark/src/test/resources/log4j-surefire-quiet.properties
rename to 
hudi-spark-datasource/hudi-spark/src/test/resources/log4j-surefire-quiet.properties
diff --git a/hudi-spark/src/test/resources/log4j-surefire.properties 
b/hudi-spark-datasource/hudi-spark/src/test/resources/log4j-surefire.properties
similarity index 100%
rename from hudi-spark/src/test/resources/log4j-surefire.properties
rename to 
hudi-spark-datasource/hudi-spark/src/test/resources/log4j-surefire.properties
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala
similarity index 100%
rename from 
hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala
rename to 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
similarity index 100%
rename from 
hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
rename to 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
similarity index 100%
rename from hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
rename to 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
similarity index 100%
rename from 
hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
rename to 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
similarity index 100%
rename from 
hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
rename to 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
similarity index 100%
rename from 
hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
rename to 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
similarity index 100%
rename from 
hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
rename to 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
similarity index 98%
rename from 
hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
rename to 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 226cf53..7a902c1 100644
--- 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
 import org.apache.log4j.LogManager
 import org.apache.spark.sql._
-import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
+import org.apache.spark.sql.streaming.{OutputMode, Trigger}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
@@ -93,7 +93,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
         .writeStream
         .format("org.apache.hudi")
         .options(commonOpts)
-        .trigger(new ProcessingTime(100))
+        .trigger(Trigger.ProcessingTime(100))
         .option("checkpointLocation", basePath + "/checkpoint")
         .outputMode(OutputMode.Append)
         .start(destPath)
diff --git a/hudi-spark/pom.xml b/hudi-spark-datasource/hudi-spark2/pom.xml
similarity index 52%
rename from hudi-spark/pom.xml
rename to hudi-spark-datasource/hudi-spark2/pom.xml
index 0942327..8947bb7 100644
--- a/hudi-spark/pom.xml
+++ b/hudi-spark-datasource/hudi-spark2/pom.xml
@@ -6,9 +6,7 @@
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at
-
        http://www.apache.org/licenses/LICENSE-2.0
-
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,17 +15,20 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <parent>
-    <artifactId>hudi</artifactId>
+    <artifactId>hudi-spark-datasource</artifactId>
     <groupId>org.apache.hudi</groupId>
     <version>0.6.1-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
-  <artifactId>hudi-spark_${scala.binary.version}</artifactId>
+  <artifactId>hudi-spark2_${scala.binary.version}</artifactId>
+  <version>${parent.version}</version>
+
+  <name>hudi-spark2_${scala.binary.version}</name>
   <packaging>jar</packaging>
 
   <properties>
-    <main.basedir>${project.parent.basedir}</main.basedir>
+    <main.basedir>${project.parent.parent.basedir}</main.basedir>
   </properties>
 
   <build>
@@ -128,25 +129,6 @@
         <artifactId>apache-rat-plugin</artifactId>
       </plugin>
       <plugin>
-        <groupId>org.scalatest</groupId>
-        <artifactId>scalatest-maven-plugin</artifactId>
-        <version>1.0</version>
-        <configuration>
-          <skipTests>${skipUTs}</skipTests>
-          
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-          <junitxml>.</junitxml>
-          <filereports>TestSuite.txt</filereports>
-        </configuration>
-        <executions>
-          <execution>
-            <id>test</id>
-            <goals>
-              <goal>test</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
         <groupId>org.scalastyle</groupId>
         <artifactId>scalastyle-maven-plugin</artifactId>
       </plugin>
@@ -183,163 +165,28 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hudi</groupId>
-      <artifactId>hudi-hadoop-mr</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hudi</groupId>
-      <artifactId>hudi-hive-sync</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hudi</groupId>
-      <artifactId>hudi-sync-common</artifactId>
+      <artifactId>hudi-spark-common</artifactId>
       <version>${project.version}</version>
     </dependency>
 
-    <!-- Logging -->
-    <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-    </dependency>
-
-    <!-- Fasterxml -->
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-annotations</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.fasterxml.jackson.module</groupId>
-      <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
-    </dependency>
-
-    <!-- Avro -->
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro</artifactId>
-      <exclusions>
-        <exclusion>
-          <!-- this version to conflict to spark-core_2.12 -->
-          <groupId>com.thoughtworks.paranamer</groupId>
-          <artifactId>paranamer</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <!-- Parquet -->
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-avro</artifactId>
-    </dependency>
-
-    <!-- Spark -->
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-        <exclusions>
-           <exclusion>
-               <groupId>javax.servlet</groupId>
-               <artifactId>*</artifactId>
-           </exclusion>
-        </exclusions>
-    </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${spark2.version}</version>
+      <optional>true</optional>
     </dependency>
 
-    <!-- Spark (Packages) -->
     <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-avro_${scala.binary.version}</artifactId>
-      <scope>provided</scope>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+      <version>3.9.9.Final</version>
+      <optional>true</optional>
     </dependency>
-
-    <!-- Hadoop -->
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>javax.servlet</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-       <exclusions>
-        <exclusion>
-          <groupId>javax.servlet</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>javax.servlet.jsp</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-     <scope>provided</scope>
-    </dependency>
-
-    <!-- Hive -->
-    <dependency>
-      <groupId>${hive.groupid}</groupId>
-      <artifactId>hive-exec</artifactId>
-      <version>${hive.version}</version>
-      <classifier>${hive.exec.classifier}</classifier>
-      <exclusions>
-        <exclusion>
-          <groupId>javax.mail</groupId>
-          <artifactId>mail</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.eclipse.jetty.aggregate</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>${hive.groupid}</groupId>
-      <artifactId>hive-jdbc</artifactId>
-      <version>${hive.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>javax.servlet</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>javax.servlet.jsp</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>${hive.groupid}</groupId>
-      <artifactId>hive-metastore</artifactId>
-      <version>${hive.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>javax.servlet</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>javax.servlet.jsp</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>${hive.groupid}</groupId>
-      <artifactId>hive-common</artifactId>
-      <version>${hive.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.eclipse.jetty.orbit</groupId>
-          <artifactId>javax.servlet</artifactId>
-        </exclusion>
-      </exclusions>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+      <version>4.1.17.Final</version>
+      <optional>true</optional>
     </dependency>
 
     <!-- Hoodie - Test -->
@@ -369,41 +216,10 @@
     </dependency>
 
     <dependency>
-      <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_${scala.binary.version}</artifactId>
-      <version>${scalatest.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-api</artifactId>
       <scope>test</scope>
     </dependency>
-
-    <dependency>
-      <groupId>org.junit.jupiter</groupId>
-      <artifactId>junit-jupiter-engine</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.junit.vintage</groupId>
-      <artifactId>junit-vintage-engine</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.junit.jupiter</groupId>
-      <artifactId>junit-jupiter-params</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-junit-jupiter</artifactId>
-      <scope>test</scope>
-    </dependency>
-
   </dependencies>
+
 </project>
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/internal/DefaultSource.java 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
similarity index 100%
rename from hudi-spark/src/main/java/org/apache/hudi/internal/DefaultSource.java
rename to 
hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
similarity index 100%
rename from 
hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
rename to 
hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriterFactory.java
 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriterFactory.java
similarity index 100%
rename from 
hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriterFactory.java
rename to 
hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriterFactory.java
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
similarity index 100%
rename from 
hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
rename to 
hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
similarity index 100%
rename from 
hudi-spark/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
rename to 
hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/package.scala 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2RowDeserializer.scala
similarity index 60%
copy from hudi-spark/src/main/scala/org/apache/hudi/package.scala
copy to 
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2RowDeserializer.scala
index 4d6025f..84fe4c3 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/package.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2RowDeserializer.scala
@@ -15,24 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache
+package org.apache.hudi
 
-import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter}
+import org.apache.hudi.client.utils.SparkRowDeserializer
 
-package object hudi {
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 
-  /**
-    * Adds a method, `hoodie`, to DataFrameWriter
-    */
-  implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T]) {
-    def avro: String => Unit = writer.format("org.apache.hudi").save
+class Spark2RowDeserializer(val encoder: ExpressionEncoder[Row]) extends 
SparkRowDeserializer {
+  def deserializeRow(internalRow: InternalRow): Row = {
+    encoder.fromRow(internalRow)
   }
-
-  /**
-    * Adds a method, `hoodie`, to DataFrameReader
-    */
-  implicit class AvroDataFrameReader(reader: DataFrameReader) {
-    def avro: String => DataFrame = reader.format("org.apache.hudi").load
-  }
-
 }
diff --git 
a/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
similarity index 96%
rename from 
hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
rename to 
hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
index 5a5d8b2..ac69af5 100644
--- 
a/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
@@ -27,6 +27,7 @@ import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 
+import org.apache.spark.package$;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -34,7 +35,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -51,6 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 /**
  * Unit tests {@link HoodieBulkInsertDataInternalWriter}.
@@ -61,6 +62,8 @@ public class TestHoodieBulkInsertDataInternalWriter extends 
HoodieClientTestHarn
 
   @BeforeEach
   public void setUp() throws Exception {
+    // this test is only compatible with spark 2
+    assumeTrue(package$.MODULE$.SPARK_VERSION().startsWith("2."));
     initSparkContexts("TestHoodieBulkInsertDataInternalWriter");
     initPath();
     initFileSystem();
@@ -74,7 +77,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends 
HoodieClientTestHarn
   }
 
   @Test
-  public void testDataInternalWriter() throws IOException {
+  public void testDataInternalWriter() throws Exception {
     // init config and table
     HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
     HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
@@ -119,7 +122,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends 
HoodieClientTestHarn
    * to throw Global Error. Verify global error is set appropriately and only 
first batch of records are written to disk.
    */
   @Test
-  public void testGlobalFailure() throws IOException {
+  public void testGlobalFailure() throws Exception {
     // init config and table
     HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
     HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
@@ -165,7 +168,8 @@ public class TestHoodieBulkInsertDataInternalWriter extends 
HoodieClientTestHarn
     assertOutput(inputRows, result, instantTime, fileNames);
   }
 
-  private void writeRows(Dataset<Row> inputRows, 
HoodieBulkInsertDataInternalWriter writer) throws IOException {
+  private void writeRows(Dataset<Row> inputRows, 
HoodieBulkInsertDataInternalWriter writer)
+      throws Exception {
     List<InternalRow> internalRows = toInternalRows(inputRows, ENCODER);
     // issue writes
     for (InternalRow internalRow : internalRows) {
diff --git 
a/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
similarity index 96%
rename from 
hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
rename to 
hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
index 89d748f..454c74d 100644
--- 
a/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
@@ -26,6 +26,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
 
+import org.apache.spark.package$;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -34,7 +35,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -49,6 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 /**
  * Unit tests {@link HoodieDataSourceInternalWriter}.
@@ -59,6 +60,8 @@ public class TestHoodieDataSourceInternalWriter extends 
HoodieClientTestHarness
 
   @BeforeEach
   public void setUp() throws Exception {
+    // this test is only compatible with spark 2
+    assumeTrue(package$.MODULE$.SPARK_VERSION().startsWith("2."));
     initSparkContexts("TestHoodieDataSourceInternalWriter");
     initPath();
     initFileSystem();
@@ -72,7 +75,7 @@ public class TestHoodieDataSourceInternalWriter extends 
HoodieClientTestHarness
   }
 
   @Test
-  public void testDataSourceWriter() throws IOException {
+  public void testDataSourceWriter() throws Exception {
     // init config and table
     HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
     String instantTime = "001";
@@ -114,7 +117,7 @@ public class TestHoodieDataSourceInternalWriter extends 
HoodieClientTestHarness
   }
 
   @Test
-  public void testMultipleDataSourceWrites() throws IOException {
+  public void testMultipleDataSourceWrites() throws Exception {
     // init config and table
     HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
     int partitionCounter = 0;
@@ -158,7 +161,7 @@ public class TestHoodieDataSourceInternalWriter extends 
HoodieClientTestHarness
   }
 
   @Test
-  public void testLargeWrites() throws IOException {
+  public void testLargeWrites() throws Exception {
     // init config and table
     HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
     int partitionCounter = 0;
@@ -208,7 +211,7 @@ public class TestHoodieDataSourceInternalWriter extends 
HoodieClientTestHarness
    * verify only records from batch1 is available to read
    */
   @Test
-  public void testAbort() throws IOException {
+  public void testAbort() throws Exception {
     // init config and table
     HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
 
@@ -274,7 +277,7 @@ public class TestHoodieDataSourceInternalWriter extends 
HoodieClientTestHarness
     assertOutput(totalInputRows, result, instantTime0);
   }
 
-  private void writeRows(Dataset<Row> inputRows, DataWriter<InternalRow> 
writer) throws IOException {
+  private void writeRows(Dataset<Row> inputRows, DataWriter<InternalRow> 
writer) throws Exception {
     List<InternalRow> internalRows = toInternalRows(inputRows, ENCODER);
     // issue writes
     for (InternalRow internalRow : internalRows) {
diff --git a/hudi-spark-datasource/hudi-spark3/pom.xml 
b/hudi-spark-datasource/hudi-spark3/pom.xml
new file mode 100644
index 0000000..7b1ffa9
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/pom.xml
@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>hudi-spark-datasource</artifactId>
+    <groupId>org.apache.hudi</groupId>
+    <version>0.6.1-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>hudi-spark3_2.12</artifactId>
+  <version>${parent.version}</version>
+
+  <name>hudi-spark3_2.12</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <main.basedir>${project.parent.parent.basedir}</main.basedir>
+  </properties>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+      </resource>
+    </resources>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>net.alchim31.maven</groupId>
+          <artifactId>scala-maven-plugin</artifactId>
+          <version>${scala-maven-plugin.version}</version>
+          <configuration>
+            <args>
+              <arg>-nobootcp</arg>
+            </args>
+          </configuration>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-compiler-plugin</artifactId>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>prepare-package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+              <overWriteReleases>true</overWriteReleases>
+              <overWriteSnapshots>true</overWriteSnapshots>
+              <overWriteIfNewer>true</overWriteIfNewer>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>scala-compile-first</id>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>add-source</goal>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>scala-test-compile</id>
+            <phase>process-test-resources</phase>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>compile</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>test-compile</phase>
+          </execution>
+        </executions>
+        <configuration>
+          <skip>false</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.scalastyle</groupId>
+        <artifactId>scalastyle-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala12.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_2.12</artifactId>
+      <version>${spark3.version}</version>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+</project> 
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/package.scala 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowDeserializer.scala
similarity index 60%
rename from hudi-spark/src/main/scala/org/apache/hudi/package.scala
rename to 
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowDeserializer.scala
index 4d6025f..a060655 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/package.scala
+++ 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowDeserializer.scala
@@ -15,24 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache
+package org.apache.hudi
 
-import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter}
+import org.apache.hudi.client.utils.SparkRowDeserializer
 
-package object hudi {
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 
-  /**
-    * Adds a method, `hoodie`, to DataFrameWriter
-    */
-  implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T]) {
-    def avro: String => Unit = writer.format("org.apache.hudi").save
-  }
+class Spark3RowDeserializer(val encoder: ExpressionEncoder[Row]) extends 
SparkRowDeserializer {
 
-  /**
-    * Adds a method, `hoodie`, to DataFrameReader
-    */
-  implicit class AvroDataFrameReader(reader: DataFrameReader) {
-    def avro: String => DataFrame = reader.format("org.apache.hudi").load
-  }
+  private val deserializer: ExpressionEncoder.Deserializer[Row] = 
encoder.createDeserializer()
 
+  def deserializeRow(internalRow: InternalRow): Row = {
+    deserializer.apply(internalRow)
+  }
 }
diff --git a/hudi-spark-datasource/pom.xml b/hudi-spark-datasource/pom.xml
new file mode 100644
index 0000000..2301e5f
--- /dev/null
+++ b/hudi-spark-datasource/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>hudi</artifactId>
+    <groupId>org.apache.hudi</groupId>
+    <version>0.6.1-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>hudi-spark-datasource</artifactId>
+  <packaging>pom</packaging>
+
+  <properties>
+    <main.basedir>${project.parent.basedir}</main.basedir>
+  </properties>
+
+  <modules>
+    <module>hudi-spark-common</module>
+    <module>hudi-spark</module>
+    <module>hudi-spark2</module>
+    <module>hudi-spark3</module>
+  </modules>
+</project>
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
deleted file mode 100644
index 26babd8..0000000
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
InMemoryFileIndex}
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import scala.collection.JavaConverters._
-
-
-object HoodieSparkUtils {
-
-  def getMetaSchema: StructType = {
-    StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
-      StructField(col, StringType, nullable = true)
-    }))
-  }
-
-  def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): 
Seq[Path] = {
-    paths.flatMap(path => {
-      val qualified = new Path(path).makeQualified(fs.getUri, 
fs.getWorkingDirectory)
-      val globPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
-      globPaths
-    })
-  }
-
-  def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: 
Seq[Path]): InMemoryFileIndex = {
-    val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
-    new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, 
fileStatusCache)
-  }
-}
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index ab51475..c4a1606 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -120,6 +120,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-spark_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <exclusions>
@@ -129,6 +134,16 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark2_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark3_2.12</artifactId>
+      <version>${project.version}</version>
+    </dependency>
 
     <!-- Kafka -->
     <dependency>
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index fb42775..7d5c9a4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.Accumulator;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -68,6 +67,7 @@ import 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils;
 import org.apache.spark.sql.jdbc.JdbcDialect;
 import org.apache.spark.sql.jdbc.JdbcDialects;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.LongAccumulator;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -292,7 +292,7 @@ public class UtilHelpers {
   }
 
   public static int handleErrors(JavaSparkContext jsc, String instantTime, 
JavaRDD<WriteStatus> writeResponse) {
-    Accumulator<Integer> errors = jsc.accumulator(0);
+    LongAccumulator errors = jsc.sc().longAccumulator();
     writeResponse.foreach(writeStatus -> {
       if (writeStatus.hasErrors()) {
         errors.add(1);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index a27ce99..e17c1f0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.utilities.deltastreamer;
 
-import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -342,7 +342,7 @@ public class DeltaSync implements Serializable {
         // pass in the schema for the Row-to-Avro conversion
         // to avoid nullability mismatch between Avro schema and Row schema
         avroRDDOptional = transformed
-            .map(t -> AvroConversionUtils.createRdd(
+            .map(t -> HoodieSparkUtils.createRdd(
                 t, this.userProvidedSchemaProvider.getTargetSchema(),
                 HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD());
         schemaProvider = this.userProvidedSchemaProvider;
@@ -356,7 +356,7 @@ public class DeltaSync implements Serializable {
                     UtilHelpers.createRowBasedSchemaProvider(r.schema(), 
props, jssc)))
                 .orElse(dataAndCheckpoint.getSchemaProvider());
         avroRDDOptional = transformed
-            .map(t -> AvroConversionUtils.createRdd(
+            .map(t -> HoodieSparkUtils.createRdd(
                 t, HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD());
       }
     } else {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index 505deed..379cc4b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
@@ -73,8 +74,8 @@ public final class SourceFormatAdapter {
                   // If the source schema is specified through Avro schema,
                   // pass in the schema for the Row-to-Avro conversion
                   // to avoid nullability mismatch between Avro schema and Row 
schema
-                  ? AvroConversionUtils.createRdd(rdd, 
r.getSchemaProvider().getSourceSchema(),
-                  HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD() : AvroConversionUtils.createRdd(rdd,
+                  ? HoodieSparkUtils.createRdd(rdd, 
r.getSchemaProvider().getSourceSchema(),
+                  HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD() : HoodieSparkUtils.createRdd(rdd,
                   HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD();
             })
             .orElse(null)), r.getCheckpointForNextBatch(), 
r.getSchemaProvider());
diff --git a/packaging/hudi-integ-test-bundle/pom.xml 
b/packaging/hudi-integ-test-bundle/pom.xml
index 84285dc..2e27262 100644
--- a/packaging/hudi-integ-test-bundle/pom.xml
+++ b/packaging/hudi-integ-test-bundle/pom.xml
@@ -73,6 +73,8 @@
                   <include>org.apache.hudi:hudi-spark-client</include>
                   
<include>org.apache.hudi:hudi-utilities_${scala.binary.version}</include>
                   
<include>org.apache.hudi:hudi-spark_${scala.binary.version}</include>
+                  
<include>org.apache.hudi:hudi-spark2_${scala.binary.version}</include>
+                  <include>org.apache.hudi:hudi-spark3_2.12</include>
                   <include>org.apache.hudi:hudi-hive-sync</include>
                   <include>org.apache.hudi:hudi-sync-common</include>
                   <include>org.apache.hudi:hudi-hadoop-mr</include>
@@ -340,6 +342,18 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark2_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark3_2.12</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>
       <classifier>tests</classifier>
diff --git a/packaging/hudi-spark-bundle/pom.xml 
b/packaging/hudi-spark-bundle/pom.xml
index e4f4bbc..caa254a 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -66,7 +66,10 @@
                   <include>org.apache.hudi:hudi-common</include>
                   <include>org.apache.hudi:hudi-client-common</include>
                   <include>org.apache.hudi:hudi-spark-client</include>
+                  <include>org.apache.hudi:hudi-spark-common</include>
                   
<include>org.apache.hudi:hudi-spark_${scala.binary.version}</include>
+                  
<include>org.apache.hudi:hudi-spark2_${scala.binary.version}</include>
+                  <include>org.apache.hudi:hudi-spark3_2.12</include>
                   <include>org.apache.hudi:hudi-hive-sync</include>
                   <include>org.apache.hudi:hudi-sync-common</include>
                   <include>org.apache.hudi:hudi-hadoop-mr</include>
@@ -222,11 +225,26 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-spark_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark2_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark3_2.12</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-timeline-service</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/packaging/hudi-utilities-bundle/pom.xml 
b/packaging/hudi-utilities-bundle/pom.xml
index 39e48bb..8aec184 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -69,7 +69,10 @@
                   <include>org.apache.hudi:hudi-client-common</include>
                   <include>org.apache.hudi:hudi-spark-client</include>
                   
<include>org.apache.hudi:hudi-utilities_${scala.binary.version}</include>
+                  <include>org.apache.hudi:hudi-spark-common</include>
                   
<include>org.apache.hudi:hudi-spark_${scala.binary.version}</include>
+                  
<include>org.apache.hudi:hudi-spark2_${scala.binary.version}</include>
+                  <include>org.apache.hudi:hudi-spark3_2.12</include>
                   <include>org.apache.hudi:hudi-hive-sync</include>
                   <include>org.apache.hudi:hudi-sync-common</include>
                   <include>org.apache.hudi:hudi-hadoop-mr</include>
@@ -105,6 +108,7 @@
                   <include>io.prometheus:simpleclient_common</include>
                   <include>com.yammer.metrics:metrics-core</include>
                   
<include>org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version}</include>
+                  
<include>org.apache.spark:spark-token-provider-kafka-0-10_${scala.binary.version}</include>
                   
<include>org.apache.kafka:kafka_${scala.binary.version}</include>
                   <include>com.101tec:zkclient</include>
                   <include>org.apache.kafka:kafka-clients</include>
@@ -229,11 +233,26 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-spark_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark2_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark3_2.12</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-utilities_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/pom.xml b/pom.xml
index 80876cf..8f17088 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,7 +39,7 @@
     <module>hudi-cli</module>
     <module>hudi-client</module>
     <module>hudi-hadoop-mr</module>
-    <module>hudi-spark</module>
+    <module>hudi-spark-datasource</module>
     <module>hudi-timeline-service</module>
     <module>hudi-utilities</module>
     <module>hudi-sync</module>
@@ -84,6 +84,9 @@
 
     <java.version>1.8</java.version>
     <fasterxml.version>2.6.7</fasterxml.version>
+    
<fasterxml.jackson.databind.version>2.6.7.3</fasterxml.jackson.databind.version>
+    
<fasterxml.jackson.module.scala.version>2.6.7.1</fasterxml.jackson.module.scala.version>
+    
<fasterxml.jackson.dataformat.yaml.version>2.7.4</fasterxml.jackson.dataformat.yaml.version>
     <kafka.version>2.0.0</kafka.version>
     <glassfish.version>2.17</glassfish.version>
     <parquet.version>1.10.1</parquet.version>
@@ -103,9 +106,12 @@
     <http.version>4.4.1</http.version>
     <spark.version>2.4.4</spark.version>
     <flink.version>1.11.2</flink.version>
+    <spark2.version>2.4.4</spark2.version>
+    <spark3.version>3.0.0</spark3.version>
     <avro.version>1.8.2</avro.version>
     <scala.version>2.11.12</scala.version>
     <scala.binary.version>2.11</scala.binary.version>
+    <scala12.version>2.12.10</scala12.version>
     <apache-rat-plugin.version>0.12</apache-rat-plugin.version>
     <scala-maven-plugin.version>3.3.1</scala-maven-plugin.version>
     <scalatest.version>3.0.1</scalatest.version>
@@ -432,7 +438,7 @@
       <dependency>
         <groupId>com.fasterxml.jackson.core</groupId>
         <artifactId>jackson-databind</artifactId>
-        <version>${fasterxml.version}.3</version>
+        <version>${fasterxml.jackson.databind.version}</version>
       </dependency>
       <dependency>
         <groupId>com.fasterxml.jackson.datatype</groupId>
@@ -442,7 +448,7 @@
       <dependency>
         <groupId>com.fasterxml.jackson.module</groupId>
         <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
-        <version>${fasterxml.version}.1</version>
+        <version>${fasterxml.jackson.module.scala.version}</version>
       </dependency>
 
       <!-- Glassfish -->
@@ -1306,7 +1312,7 @@
     <profile>
       <id>scala-2.12</id>
       <properties>
-        <scala.version>2.12.10</scala.version>
+        <scala.version>${scala12.version}</scala.version>
         <scala.binary.version>2.12</scala.binary.version>
       </properties>
       <activation>
@@ -1341,6 +1347,25 @@
         </plugins>
       </build>
     </profile>
+
+    <profile>
+      <id>spark3</id>
+      <properties>
+        <spark.version>${spark3.version}</spark.version>
+        <scala.version>${scala12.version}</scala.version>
+        <scala.binary.version>2.12</scala.binary.version>
+        <kafka.version>2.4.1</kafka.version>
+        <fasterxml.version>2.10.0</fasterxml.version>
+        
<fasterxml.jackson.databind.version>2.10.0</fasterxml.jackson.databind.version>
+        
<fasterxml.jackson.module.scala.version>2.10.0</fasterxml.jackson.module.scala.version>
+        
<fasterxml.jackson.dataformat.yaml.version>2.10.0</fasterxml.jackson.dataformat.yaml.version>
+      </properties>
+      <activation>
+        <property>
+          <name>spark3</name>
+        </property>
+      </activation>
+    </profile>
   </profiles>
 
 </project>

Reply via email to