This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1949a90ff24 [HUDI-6999] Adding row writer support to HoodieStreamer 
(#9913)
1949a90ff24 is described below

commit 1949a90ff24d3d87cb4a24a30aefac6dcd672e84
Author: Jon Vexler <[email protected]>
AuthorDate: Mon Nov 6 11:10:24 2023 -0500

    [HUDI-6999] Adding row writer support to HoodieStreamer (#9913)
    
    - Fixing row writer with deltastreamer and refactoring StreamSync to 
accomodate for row and avro formats
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: sivabalan <[email protected]>
---
 .../testsuite/HoodieDeltaStreamerWrapper.java      |   4 +-
 ...eamerDatasetBulkInsertCommitActionExecutor.java |  66 +++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  23 +-
 .../hudi/utilities/streamer/HoodieStreamer.java    |  10 +-
 .../utilities/streamer/HoodieStreamerUtils.java    | 151 +++++++
 .../apache/hudi/utilities/streamer/StreamSync.java | 444 +++++++++++----------
 .../deltastreamer/HoodieDeltaStreamerTestBase.java |   4 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 164 +++++++-
 .../TestHoodieDeltaStreamerDAGExecution.java       |   2 +-
 9 files changed, 644 insertions(+), 224 deletions(-)

diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 85987e36b5c..d8d5dad329d 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -25,6 +25,7 @@ import 
org.apache.hudi.common.testutils.InProcessTimeGenerator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.InputBatch;
 import org.apache.hudi.utilities.streamer.StreamSync;
 
 import org.apache.spark.api.java.JavaRDD;
@@ -80,7 +81,8 @@ public class HoodieDeltaStreamerWrapper extends 
HoodieDeltaStreamer {
     StreamSync service = getDeltaSync();
     service.refreshTimeline();
     String instantTime = InProcessTimeGenerator.createNewInstantTime();
-    return service.readFromSource(instantTime);
+    InputBatch inputBatch = service.readFromSource(instantTime).getLeft();
+    return Pair.of(inputBatch.getSchemaProvider(), 
Pair.of(inputBatch.getCheckpointForNextBatch(), (JavaRDD<HoodieRecord>) 
inputBatch.getBatch().get()));
   }
 
   public StreamSync getDeltaSync() {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
new file mode 100644
index 00000000000..5593a95ca39
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commit;
+
+import org.apache.hudi.HoodieDatasetBulkInsertHelper;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+/**
+ * Executor to be used by stream sync. Directly invokes 
HoodieDatasetBulkInsertHelper.bulkInsert so that WriteStatus is
+ * properly returned. Additionally, we do not want to commit the write in this 
code because it happens in StreamSync.
+ */
+public class HoodieStreamerDatasetBulkInsertCommitActionExecutor extends 
BaseDatasetBulkInsertCommitActionExecutor {
+
+  public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig 
config, SparkRDDWriteClient writeClient, String instantTime) {
+    super(config, writeClient, instantTime);
+  }
+
+  @Override
+  protected void preExecute() {
+    // no op
+  }
+
+  @Override
+  protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
result) {
+    // no op
+  }
+
+  @Override
+  protected Option<HoodieData<WriteStatus>> doExecute(Dataset<Row> records, 
boolean arePartitionRecordsSorted) {
+    table.getActiveTimeline().transitionRequestedToInflight(new 
HoodieInstant(HoodieInstant.State.REQUESTED, getCommitActionType(), 
instantTime), Option.empty());
+    return Option.of(HoodieDatasetBulkInsertHelper
+        .bulkInsert(records, instantTime, table, writeConfig, 
arePartitionRecordsSorted, false));
+  }
+
+  @Override
+  public WriteOperationType getWriteOperationType() {
+    return WriteOperationType.BULK_INSERT;
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index a63f861480d..93b8c2fcf91 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -149,6 +149,19 @@ object HoodieSparkSqlWriter {
     Metrics.shutdownAllMetrics()
   }
 
+  def getBulkInsertRowConfig(writerSchema: Schema, hoodieConfig: HoodieConfig,
+                             basePath: String, tblName: String): 
HoodieWriteConfig = {
+    val writerSchemaStr = writerSchema.toString
+
+    // Make opts mutable since it could be modified by 
tryOverrideParquetWriteLegacyFormatProperty
+    val opts = mutable.Map() ++ hoodieConfig.getProps.toMap ++
+      Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> writerSchemaStr)
+
+    // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
+    tryOverrideParquetWriteLegacyFormatProperty(opts, 
convertAvroSchemaToStructType(writerSchema))
+    DataSourceUtils.createHoodieConfig(writerSchemaStr, basePath, tblName, 
opts)
+  }
+
 }
 
 class HoodieSparkSqlWriterInternal {
@@ -940,15 +953,7 @@ class HoodieSparkSqlWriterInternal {
     val sqlContext = 
writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext
     val jsc = 
writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getJavaSparkContext
 
-    val writerSchemaStr = writerSchema.toString
-
-    // Make opts mutable since it could be modified by 
tryOverrideParquetWriteLegacyFormatProperty
-    val opts = mutable.Map() ++ hoodieConfig.getProps.toMap ++
-      Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> writerSchemaStr)
-
-    // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
-    tryOverrideParquetWriteLegacyFormatProperty(opts, 
convertAvroSchemaToStructType(writerSchema))
-    val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr, 
basePath.toString, tblName, opts)
+    val writeConfig = 
HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema, hoodieConfig, 
basePath.toString, tblName)
     val overwriteOperationType = 
Option(hoodieConfig.getString(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE))
       .map(WriteOperationType.fromValue)
       .orNull
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 95534c5533f..5604a6240c7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -93,6 +93,8 @@ import java.util.concurrent.Executors;
 
 import static java.lang.String.format;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.utilities.UtilHelpers.buildProperties;
+import static org.apache.hudi.utilities.UtilHelpers.readConfig;
 
 /**
  * An Utility which can incrementally take the output from {@link 
HiveIncrementalPuller} and apply it to the target
@@ -170,7 +172,7 @@ public class HoodieStreamer implements Serializable {
     } else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) 
{
       hoodieConfig.setAll(UtilHelpers.getConfig(cfg.configs).getProps());
     } else {
-      hoodieConfig.setAll(UtilHelpers.readConfig(hadoopConf, new 
Path(cfg.propsFilePath), cfg.configs).getProps());
+      hoodieConfig.setAll(readConfig(hadoopConf, new Path(cfg.propsFilePath), 
cfg.configs).getProps());
     }
 
     // set any configs that Deltastreamer has to override explicitly
@@ -429,6 +431,12 @@ public class HoodieStreamer implements Serializable {
           && 
HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
     }
 
+    public static TypedProperties getProps(FileSystem fs, Config cfg) {
+      return cfg.propsFilePath.isEmpty()
+          ? buildProperties(cfg.configs)
+          : readConfig(fs.getConf(), new Path(cfg.propsFilePath), 
cfg.configs).getProps();
+    }
+
     @Override
     public boolean equals(Object o) {
       if (this == o) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
new file mode 100644
index 00000000000..ad1de230f41
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.SparkAdapterSupport$;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.SerializableSchema;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.keygen.BuiltinKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.SparkKeyGenUtils;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.avro.HoodieAvroDeserializer;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS;
+
+
+/**
+ * Util class for HoodieStreamer.
+ */
+public class HoodieStreamerUtils {
+
+  /**
+   * Generates HoodieRecords for the avro data read from source.
+   * Takes care of dropping columns, precombine, auto key generation.
+   * Both AVRO and SPARK record types are supported.
+   */
+  static JavaRDD<HoodieRecord> createHoodieRecords(HoodieStreamer.Config cfg, 
TypedProperties props, Option<JavaRDD<GenericRecord>> avroRDDOptional,
+                                  SchemaProvider schemaProvider, 
HoodieRecord.HoodieRecordType recordType, boolean autoGenerateRecordKeys,
+                                  String instantTime) {
+    boolean shouldCombine = cfg.filterDupes || 
cfg.operation.equals(WriteOperationType.UPSERT);
+    Set<String> partitionColumns = getPartitionColumns(props);
+    JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
+
+    JavaRDD<HoodieRecord> records;
+    SerializableSchema avroSchema = new 
SerializableSchema(schemaProvider.getTargetSchema());
+    SerializableSchema processedAvroSchema = new 
SerializableSchema(isDropPartitionColumns(props) ? 
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
+    if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
+      records = avroRDD.mapPartitions(
+          (FlatMapFunction<Iterator<GenericRecord>, HoodieRecord>) 
genericRecordIterator -> {
+            if (autoGenerateRecordKeys) {
+              
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()));
+              
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
+            }
+            BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+            List<HoodieRecord> avroRecords = new ArrayList<>();
+            while (genericRecordIterator.hasNext()) {
+              GenericRecord genRec = genericRecordIterator.next();
+              HoodieKey hoodieKey = new 
HoodieKey(builtinKeyGenerator.getRecordKey(genRec), 
builtinKeyGenerator.getPartitionPath(genRec));
+              GenericRecord gr = isDropPartitionColumns(props) ? 
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
+              HoodieRecordPayload payload = shouldCombine ? 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
+                  (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false, props.getBoolean(
+                      
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+                      
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
+                  : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
+              avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload));
+            }
+            return avroRecords.iterator();
+          });
+    } else if (recordType == HoodieRecord.HoodieRecordType.SPARK) {
+      // TODO we should remove it if we can read InternalRow from source.
+      records = avroRDD.mapPartitions(itr -> {
+        if (autoGenerateRecordKeys) {
+          props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()));
+          props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
instantTime);
+        }
+        BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+        StructType baseStructType = 
AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get());
+        StructType targetStructType = isDropPartitionColumns(props) ? 
AvroConversionUtils
+            
.convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(),
 partitionColumns)) : baseStructType;
+        HoodieAvroDeserializer deserializer = 
SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedAvroSchema.get(),
 baseStructType);
+
+        return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), rec 
-> {
+          InternalRow row = (InternalRow) deserializer.deserialize(rec).get();
+          String recordKey = builtinKeyGenerator.getRecordKey(row, 
baseStructType).toString();
+          String partitionPath = builtinKeyGenerator.getPartitionPath(row, 
baseStructType).toString();
+          return new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath),
+              HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, 
targetStructType).apply(row), targetStructType, false);
+        });
+      });
+    } else {
+      throw new UnsupportedOperationException(recordType.name());
+    }
+    return records;
+  }
+
+  /**
+   * Set based on hoodie.datasource.write.drop.partition.columns config.
+   * When set to true, will not write the partition columns into the table.
+   */
+  static Boolean isDropPartitionColumns(TypedProperties props) {
+    return props.getBoolean(DROP_PARTITION_COLUMNS.key(), 
DROP_PARTITION_COLUMNS.defaultValue());
+  }
+
+  /**
+   * Get the partition columns as a set of strings.
+   *
+   * @param props TypedProperties
+   * @return Set of partition columns.
+   */
+  static Set<String> getPartitionColumns(TypedProperties props) {
+    String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
+    return 
Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
+  }
+
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 936eef67a9b..82e058b7c0b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -25,7 +25,6 @@ import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.HoodieConversionUtils;
 import org.apache.hudi.HoodieSparkSqlWriter;
 import org.apache.hudi.HoodieSparkUtils;
-import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDWriteClient;
@@ -33,18 +32,16 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.commit.BaseDatasetBulkInsertCommitActionExecutor;
+import 
org.apache.hudi.commit.HoodieStreamerDatasetBulkInsertCommitActionExecutor;
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
-import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -58,8 +55,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -73,9 +68,7 @@ import org.apache.hudi.exception.HoodieMetaSyncException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.keygen.BuiltinKeyGenerator;
 import org.apache.hudi.keygen.KeyGenUtils;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.sync.common.util.SyncUtilHelpers;
@@ -97,6 +90,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaSet;
 import org.apache.hudi.utilities.schema.SimpleSchemaProvider;
 import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.Source;
 import org.apache.hudi.utilities.streamer.HoodieStreamer.Config;
 import org.apache.hudi.utilities.transform.Transformer;
 
@@ -107,18 +101,12 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.HoodieInternalRowUtils;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.avro.HoodieAvroDeserializer;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -130,20 +118,17 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 import scala.Tuple2;
 import scala.collection.JavaConversions;
 
 import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
-import static 
org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
@@ -265,6 +250,8 @@ public class StreamSync implements Serializable, Closeable {
 
   private final boolean autoGenerateRecordKeys;
 
+  private final boolean useRowWriter;
+
   @Deprecated
   public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, 
SchemaProvider schemaProvider,
                     TypedProperties props, JavaSparkContext jssc, FileSystem 
fs, Configuration conf,
@@ -297,13 +284,18 @@ public class StreamSync implements Serializable, 
Closeable {
       this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg, 
sparkSession, props, hoodieSparkContext, fs);
       this.errorWriteFailureStrategy = 
ErrorTableUtils.getErrorWriteFailureStrategy(props);
     }
-    this.formatAdapter = new SourceFormatAdapter(
-        UtilHelpers.createSource(cfg.sourceClassName, props, 
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics),
-        this.errorTableWriter, Option.of(props));
+    Source source = UtilHelpers.createSource(cfg.sourceClassName, props, 
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics);
+    this.formatAdapter = new SourceFormatAdapter(source, 
this.errorTableWriter, Option.of(props));
 
     this.transformer = 
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames),
         
Option.ofNullable(schemaProvider).map(SchemaProvider::getSourceSchema), 
this.errorTableWriter.isPresent());
-
+    if (this.cfg.operation == WriteOperationType.BULK_INSERT && 
source.getSourceType() == Source.SourceType.ROW
+        && 
this.props.getBoolean(DataSourceWriteOptions.ENABLE_ROW_WRITER().key(), false)) 
{
+      // enable row writer only when operation is BULK_INSERT, and source is 
ROW type and if row writer is not explicitly disabled.
+      this.useRowWriter = true;
+    } else {
+      this.useRowWriter = false;
+    }
   }
 
   /**
@@ -382,7 +374,7 @@ public class StreamSync implements Serializable, Closeable {
             HoodieTableConfig.CDC_ENABLED.defaultValue()))
         
.setCDCSupplementalLoggingMode(props.getString(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key(),
             HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.defaultValue()))
-        .setShouldDropPartitionColumns(isDropPartitionColumns())
+        
.setShouldDropPartitionColumns(HoodieStreamerUtils.isDropPartitionColumns(props))
         
.setHiveStylePartitioningEnable(props.getBoolean(HIVE_STYLE_PARTITIONING_ENABLE.key(),
             
Boolean.parseBoolean(HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())))
         
.setUrlEncodePartitioning(props.getBoolean(URL_ENCODE_PARTITIONING.key(),
@@ -408,19 +400,25 @@ public class StreamSync implements Serializable, 
Closeable {
         .build();
     String instantTime = metaClient.createNewInstantTime();
 
-    Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
srcRecordsWithCkpt = readFromSource(instantTime);
+    Pair<InputBatch,Boolean> inputBatchIsEmptyPair = 
readFromSource(instantTime);
+
+    if (inputBatchIsEmptyPair != null) {
+      final JavaRDD<HoodieRecord> recordsFromSource;
+      if (useRowWriter) {
+        recordsFromSource = hoodieSparkContext.emptyRDD();
+      } else {
+        recordsFromSource = (JavaRDD<HoodieRecord>) 
inputBatchIsEmptyPair.getKey().getBatch().get();
+      }
 
-    if (srcRecordsWithCkpt != null) {
-      final JavaRDD<HoodieRecord> recordsFromSource = 
srcRecordsWithCkpt.getRight().getRight();
       // this is the first input batch. If schemaProvider not set, use it and 
register Avro Schema and start
       // compactor
       if (writeClient == null) {
-        this.schemaProvider = srcRecordsWithCkpt.getKey();
+        this.schemaProvider = 
inputBatchIsEmptyPair.getKey().getSchemaProvider();
         // Setup HoodieWriteClient and compaction now that we decided on schema
         setupWriteClient(recordsFromSource);
       } else {
-        Schema newSourceSchema = srcRecordsWithCkpt.getKey().getSourceSchema();
-        Schema newTargetSchema = srcRecordsWithCkpt.getKey().getTargetSchema();
+        Schema newSourceSchema = 
inputBatchIsEmptyPair.getKey().getSchemaProvider().getSourceSchema();
+        Schema newTargetSchema = 
inputBatchIsEmptyPair.getKey().getSchemaProvider().getTargetSchema();
         if (!(processedSchema.isSchemaPresent(newSourceSchema))
             || !(processedSchema.isSchemaPresent(newTargetSchema))) {
           LOG.info("Seeing new schema. Source :" + 
newSourceSchema.toString(true)
@@ -449,8 +447,7 @@ public class StreamSync implements Serializable, Closeable {
         }
       }
 
-      result = writeToSink(instantTime, recordsFromSource,
-          srcRecordsWithCkpt.getRight().getLeft(), metrics, 
overallTimerContext);
+      result = writeToSinkAndDoMetaSync(instantTime, 
inputBatchIsEmptyPair.getKey(), inputBatchIsEmptyPair.getValue(), metrics, 
overallTimerContext);
     }
 
     metrics.updateStreamerSyncMetrics(System.currentTimeMillis());
@@ -476,11 +473,10 @@ public class StreamSync implements Serializable, 
Closeable {
   /**
    * Read from Upstream Source and apply transformation if needed.
    *
-   * @return Pair<SchemaProvider, Pair < String, JavaRDD < HoodieRecord>>> 
Input data read from upstream source, consists
-   * of schemaProvider, checkpointStr and hoodieRecord
+   * @return Pair<InputBatch and Boolean> Input data read from upstream 
source, and boolean is true if empty.
    * @throws Exception in case of any Exception
    */
-  public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
readFromSource(String instantTime) throws IOException {
+  public Pair<InputBatch, Boolean> readFromSource(String instantTime) throws 
IOException {
     // Retrieve the previous round checkpoints, if any
     Option<String> resumeCheckpointStr = Option.empty();
     if (commitsTimelineOpt.isPresent()) {
@@ -495,10 +491,10 @@ public class StreamSync implements Serializable, 
Closeable {
 
     int maxRetryCount = cfg.retryOnSourceFailures ? cfg.maxRetryCount : 1;
     int curRetryCount = 0;
-    Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> sourceDataToSync 
= null;
+    Pair<InputBatch, Boolean> sourceDataToSync = null;
     while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
       try {
-        sourceDataToSync = fetchFromSource(resumeCheckpointStr, instantTime);
+        sourceDataToSync = 
fetchFromSourceAndPrepareRecords(resumeCheckpointStr, instantTime);
       } catch (HoodieSourceTimeoutException e) {
         if (curRetryCount >= maxRetryCount) {
           throw e;
@@ -515,17 +511,54 @@ public class StreamSync implements Serializable, 
Closeable {
     return sourceDataToSync;
   }
 
-  private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
fetchFromSource(Option<String> resumeCheckpointStr, String instantTime) {
+  private Pair<InputBatch, Boolean> 
fetchFromSourceAndPrepareRecords(Option<String> resumeCheckpointStr, String 
instantTime) {
     HoodieRecordType recordType = createRecordMerger(props).getRecordType();
     if (recordType == HoodieRecordType.SPARK && 
HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
+        && !cfg.operation.equals(WriteOperationType.BULK_INSERT)
         && 
HoodieLogBlockType.fromId(props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
 "avro"))
         != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
       throw new UnsupportedOperationException("Spark record only support 
parquet log.");
     }
 
-    final Option<JavaRDD<GenericRecord>> avroRDDOptional;
-    final String checkpointStr;
-    SchemaProvider schemaProvider;
+    InputBatch inputBatch = fetchNextBatchFromSource(resumeCheckpointStr);
+    final String checkpointStr = inputBatch.getCheckpointForNextBatch();
+    final SchemaProvider schemaProvider = inputBatch.getSchemaProvider();
+
+    // handle no new data and no change in checkpoint
+    if (!cfg.allowCommitOnNoCheckpointChange && Objects.equals(checkpointStr, 
resumeCheckpointStr.orElse(null))) {
+      LOG.info("No new data, source checkpoint has not changed. Nothing to 
commit. Old checkpoint=("
+          + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
+      String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
+      hoodieMetrics.updateMetricsForEmptyData(commitActionType);
+      return null;
+    }
+
+    // handle empty batch with change in checkpoint
+    hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking 
if input is empty");
+    Pair<InputBatch, Boolean> preparedInputBatchIsEmptyPair = 
handleEmptyBatch(useRowWriter, inputBatch, checkpointStr, schemaProvider);
+    if (preparedInputBatchIsEmptyPair.getValue()) { // return if empty batch
+      return preparedInputBatchIsEmptyPair;
+    }
+
+    if (useRowWriter) { // no additional processing required for row writer.
+      return Pair.of(inputBatch, false);
+    } else {
+      JavaRDD<HoodieRecord> records = 
HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(), 
schemaProvider,
+          recordType, autoGenerateRecordKeys, instantTime);
+      return Pair.of(new InputBatch(Option.of(records), checkpointStr, 
schemaProvider), false);
+    }
+  }
+
+  /**
+   * Fetch data from source, apply transformations if any, align with schema 
from schema provider if need be and return the input batch.
+   * @param resumeCheckpointStr checkpoint to resume from source.
+   * @return {@link InputBatch} containing the new batch of data from source 
along with new checkpoint and schema provider instance to use.
+   */
+  private InputBatch fetchNextBatchFromSource(Option<String> 
resumeCheckpointStr) {
+    Option<JavaRDD<GenericRecord>> avroRDDOptional = null;
+    String checkpointStr = null;
+    SchemaProvider schemaProvider = null;
+    InputBatch inputBatchForWriter = null; // row writer
     if (transformer.isPresent()) {
       // Transformation is needed. Fetch New rows in Row Format, apply 
transformation and then convert them
       // to generic records for writing
@@ -541,29 +574,37 @@ public class StreamSync implements Serializable, 
Closeable {
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       boolean reconcileSchema = 
props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
       if (this.userProvidedSchemaProvider != null && 
this.userProvidedSchemaProvider.getTargetSchema() != null) {
-        // If the target schema is specified through Avro schema,
-        // pass in the schema for the Row-to-Avro conversion
-        // to avoid nullability mismatch between Avro schema and Row schema
-        if (errorTableWriter.isPresent()
-            && 
props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
-            
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue())) {
-          // If the above conditions are met, trigger error events for the 
rows whose conversion to
-          // avro records fails.
-          avroRDDOptional = transformed.map(
-              rowDataset -> {
-                Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs = 
HoodieSparkUtils.safeCreateRDD(rowDataset,
-                    HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
-                    
Option.of(this.userProvidedSchemaProvider.getTargetSchema()));
-                
errorTableWriter.get().addErrorEvents(safeCreateRDDs._2().toJavaRDD()
-                    .map(evStr -> new ErrorEvent<>(evStr,
-                        ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
-                return safeCreateRDDs._1.toJavaRDD();
-              });
+        if (useRowWriter) {
+          if (errorTableWriter.isPresent()) {
+            throw new HoodieException("Error table is not yet supported with 
row writer");
+          }
+          inputBatchForWriter = new InputBatch(transformed, checkpointStr, 
this.userProvidedSchemaProvider);
         } else {
-          avroRDDOptional = transformed.map(
-              rowDataset -> getTransformedRDD(rowDataset, reconcileSchema, 
this.userProvidedSchemaProvider.getTargetSchema()));
+          // non row writer path
+          // If the target schema is specified through Avro schema,
+          // pass in the schema for the Row-to-Avro conversion
+          // to avoid nullability mismatch between Avro schema and Row schema
+          if (errorTableWriter.isPresent()
+              && 
props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
+              
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue())) {
+            // If the above conditions are met, trigger error events for the 
rows whose conversion to
+            // avro records fails.
+            avroRDDOptional = transformed.map(
+                rowDataset -> {
+                  Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs = 
HoodieSparkUtils.safeCreateRDD(rowDataset,
+                      HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
+                      
Option.of(this.userProvidedSchemaProvider.getTargetSchema()));
+                  
errorTableWriter.get().addErrorEvents(safeCreateRDDs._2().toJavaRDD()
+                      .map(evStr -> new ErrorEvent<>(evStr,
+                          
ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
+                  return safeCreateRDDs._1.toJavaRDD();
+                });
+          } else {
+            avroRDDOptional = transformed.map(
+                rowDataset -> getTransformedRDD(rowDataset, reconcileSchema, 
this.userProvidedSchemaProvider.getTargetSchema()));
+          }
+          schemaProvider = this.userProvidedSchemaProvider;
         }
-        schemaProvider = this.userProvidedSchemaProvider;
       } else {
         Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs, 
cfg.targetBasePath);
         // Deduce proper target (writer's) schema for the transformed dataset, 
reconciling its
@@ -584,87 +625,59 @@ public class StreamSync implements Serializable, 
Closeable {
                 (SchemaProvider) new DelegatingSchemaProvider(props, 
hoodieSparkContext.jsc(), dataAndCheckpoint.getSchemaProvider(),
                     new SimpleSchemaProvider(hoodieSparkContext.jsc(), 
targetSchema, props)))
             .orElse(dataAndCheckpoint.getSchemaProvider());
-        // Rewrite transformed records into the expected target schema
-        avroRDDOptional = transformed.map(t -> getTransformedRDD(t, 
reconcileSchema, schemaProvider.getTargetSchema()));
+        if (useRowWriter) {
+          inputBatchForWriter = new InputBatch(transformed, checkpointStr, 
schemaProvider);
+        } else {
+          // Rewrite transformed records into the expected target schema
+          SchemaProvider finalSchemaProvider = schemaProvider;
+          avroRDDOptional = transformed.map(t -> getTransformedRDD(t, 
reconcileSchema, finalSchemaProvider.getTargetSchema()));
+        }
       }
     } else {
-      // Pull the data from the source & prepare the write
-      InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
-          formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, 
cfg.sourceLimit);
-      avroRDDOptional = dataAndCheckpoint.getBatch();
-      checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
-      schemaProvider = dataAndCheckpoint.getSchemaProvider();
+      if (useRowWriter) {
+        inputBatchForWriter = 
formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
+      } else {
+        // Pull the data from the source & prepare the write
+        InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
+            formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, 
cfg.sourceLimit);
+        avroRDDOptional = dataAndCheckpoint.getBatch();
+        checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
+        schemaProvider = dataAndCheckpoint.getSchemaProvider();
+      }
     }
 
-    if (!cfg.allowCommitOnNoCheckpointChange && Objects.equals(checkpointStr, 
resumeCheckpointStr.orElse(null))) {
-      LOG.info("No new data, source checkpoint has not changed. Nothing to 
commit. Old checkpoint=("
-          + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
-      String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
-      hoodieMetrics.updateMetricsForEmptyData(commitActionType);
-      return null;
+    if (useRowWriter) {
+      return inputBatchForWriter;
+    } else {
+      return new InputBatch(avroRDDOptional, checkpointStr, schemaProvider);
     }
+  }
 
+  /**
+   * Handles empty batch from input.
+   * @param useRowWriter true if row write code path.
+   * @param inputBatch {@link InputBatch} instance to use.
+   * @param checkpointForNextBatch checkpiont to use for next batch.
+   * @param schemaProvider {@link SchemaProvider} instance of interest.
+   * @return a Pair of InputBatch and boolean. boolean value is set to true on 
empty batch.
+   */
+  private Pair<InputBatch, Boolean> handleEmptyBatch(boolean useRowWriter, 
InputBatch inputBatch,
+                                              String checkpointForNextBatch, 
SchemaProvider schemaProvider) {
     hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking 
if input is empty");
-    if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
-      LOG.info("No new data, perform empty commit.");
-      return Pair.of(schemaProvider, Pair.of(checkpointStr, 
hoodieSparkContext.emptyRDD()));
-    }
-
-    boolean shouldCombine = cfg.filterDupes || 
cfg.operation.equals(WriteOperationType.UPSERT);
-    Set<String> partitionColumns = getPartitionColumns(props);
-    JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
-
-    JavaRDD<HoodieRecord> records;
-    SerializableSchema avroSchema = new 
SerializableSchema(schemaProvider.getTargetSchema());
-    SerializableSchema processedAvroSchema = new 
SerializableSchema(isDropPartitionColumns() ? 
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
-    if (recordType == HoodieRecordType.AVRO) {
-      records = avroRDD.mapPartitions(
-          (FlatMapFunction<Iterator<GenericRecord>, HoodieRecord>) 
genericRecordIterator -> {
-            if (autoGenerateRecordKeys) {
-              
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()));
-              
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
-            }
-            BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
-            List<HoodieRecord> avroRecords = new ArrayList<>();
-            while (genericRecordIterator.hasNext()) {
-              GenericRecord genRec = genericRecordIterator.next();
-              HoodieKey hoodieKey = new 
HoodieKey(builtinKeyGenerator.getRecordKey(genRec), 
builtinKeyGenerator.getPartitionPath(genRec));
-              GenericRecord gr = isDropPartitionColumns() ? 
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
-              HoodieRecordPayload payload = shouldCombine ? 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
-                  (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false, props.getBoolean(
-                      
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-                      
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
-                  : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
-              avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload));
-            }
-            return avroRecords.iterator();
-          });
-    } else if (recordType == HoodieRecordType.SPARK) {
-      // TODO we should remove it if we can read InternalRow from source.
-      records = avroRDD.mapPartitions(itr -> {
-        if (autoGenerateRecordKeys) {
-          props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()));
-          props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
instantTime);
-        }
-        BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
-        StructType baseStructType = 
AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get());
-        StructType targetStructType = isDropPartitionColumns() ? 
AvroConversionUtils
-            
.convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(),
 partitionColumns)) : baseStructType;
-        HoodieAvroDeserializer deserializer = 
SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedAvroSchema.get(),
 baseStructType);
-
-        return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), rec 
-> {
-          InternalRow row = (InternalRow) deserializer.deserialize(rec).get();
-          String recordKey = builtinKeyGenerator.getRecordKey(row, 
baseStructType).toString();
-          String partitionPath = builtinKeyGenerator.getPartitionPath(row, 
baseStructType).toString();
-          return new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath),
-              HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, 
targetStructType).apply(row), targetStructType, false);
-        });
-      });
+    if (useRowWriter) {
+      Option<Dataset<Row>> rowDatasetOptional = inputBatch.getBatch();
+      if ((!rowDatasetOptional.isPresent()) || 
(rowDatasetOptional.get().isEmpty())) {
+        LOG.info("No new data, perform empty commit.");
+        return Pair.of(new 
InputBatch<>(Option.of(sparkSession.emptyDataFrame()), checkpointForNextBatch, 
schemaProvider), true);
+      }
     } else {
-      throw new UnsupportedOperationException(recordType.name());
+      Option<JavaRDD<GenericRecord>> avroRDDOptional = inputBatch.getBatch();
+      if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) 
{
+        LOG.info("No new data, perform empty commit.");
+        return Pair.of(new 
InputBatch(Option.of(hoodieSparkContext.emptyRDD()), checkpointForNextBatch, 
schemaProvider), true);
+      }
     }
-
-    return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
+    return Pair.of(inputBatch, false);
   }
 
   private JavaRDD<GenericRecord> getTransformedRDD(Dataset<Row> rowDataset, 
boolean reconcileSchema, Schema readerSchema) {
@@ -750,70 +763,44 @@ public class StreamSync implements Serializable, 
Closeable {
     }).orElse(Option.empty());
   }
 
+  private HoodieWriteConfig prepareHoodieConfigForRowWriter(Schema 
writerSchema) {
+    HoodieConfig hoodieConfig = new 
HoodieConfig(HoodieStreamer.Config.getProps(fs, cfg));
+    hoodieConfig.setValue(DataSourceWriteOptions.TABLE_TYPE(), cfg.tableType);
+    hoodieConfig.setValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(), 
cfg.payloadClassName);
+    hoodieConfig.setValue(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), 
HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(props));
+    hoodieConfig.setValue("path", cfg.targetBasePath);
+    return HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema, 
hoodieConfig, cfg.targetBasePath, cfg.targetTableName);
+  }
+
   /**
    * Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive 
if needed.
    *
    * @param instantTime         instant time to use for ingest.
-   * @param records             Input Records
-   * @param checkpointStr       Checkpoint String
+   * @param inputBatch          input batch that contains the records, 
checkpoint, and schema provider
+   * @param inputIsEmpty             true if input batch is empty.
    * @param metrics             Metrics
    * @param overallTimerContext Timer Context
    * @return Option Compaction instant if one is scheduled
    */
-  private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(String 
instantTime, JavaRDD<HoodieRecord> records, String checkpointStr,
-                                                                 
HoodieIngestionMetrics metrics,
-                                                                 Timer.Context 
overallTimerContext) {
+  private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch, boolean 
inputIsEmpty,
+                                                                              
HoodieIngestionMetrics metrics,
+                                                                              
Timer.Context overallTimerContext) {
     Option<String> scheduledCompactionInstant = Option.empty();
-    // filter dupes if needed
-    if (cfg.filterDupes) {
-      records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(), 
records, writeClient.getConfig());
-    }
-
-    boolean isEmpty = records.isEmpty();
-    instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
-    LOG.info("Starting commit  : " + instantTime);
-
-    HoodieWriteResult writeResult;
-    Map<String, List<String>> partitionToReplacedFileIds = 
Collections.emptyMap();
-    JavaRDD<WriteStatus> writeStatusRDD;
-    switch (cfg.operation) {
-      case INSERT:
-        writeStatusRDD = writeClient.insert(records, instantTime);
-        break;
-      case UPSERT:
-        writeStatusRDD = writeClient.upsert(records, instantTime);
-        break;
-      case BULK_INSERT:
-        writeStatusRDD = writeClient.bulkInsert(records, instantTime);
-        break;
-      case INSERT_OVERWRITE:
-        writeResult = writeClient.insertOverwrite(records, instantTime);
-        partitionToReplacedFileIds = 
writeResult.getPartitionToReplaceFileIds();
-        writeStatusRDD = writeResult.getWriteStatuses();
-        break;
-      case INSERT_OVERWRITE_TABLE:
-        writeResult = writeClient.insertOverwriteTable(records, instantTime);
-        partitionToReplacedFileIds = 
writeResult.getPartitionToReplaceFileIds();
-        writeStatusRDD = writeResult.getWriteStatuses();
-        break;
-      case DELETE_PARTITION:
-        List<String> partitions = records.map(record -> 
record.getPartitionPath()).distinct().collect();
-        writeResult = writeClient.deletePartitions(partitions, instantTime);
-        partitionToReplacedFileIds = 
writeResult.getPartitionToReplaceFileIds();
-        writeStatusRDD = writeResult.getWriteStatuses();
-        break;
-      default:
-        throw new HoodieStreamerException("Unknown operation : " + 
cfg.operation);
-    }
+    // write to hudi and fetch result
+    Pair<WriteClientWriteResult, Boolean>  writeClientWriteResultIsEmptyPair = 
writeToSink(inputBatch, instantTime, inputIsEmpty);
+    JavaRDD<WriteStatus> writeStatusRDD = 
writeClientWriteResultIsEmptyPair.getKey().getWriteStatusRDD();
+    Map<String, List<String>> partitionToReplacedFileIds = 
writeClientWriteResultIsEmptyPair.getKey().getPartitionToReplacedFileIds();
+    boolean isEmpty = writeClientWriteResultIsEmptyPair.getRight();
 
+    // process write status
     long totalErrorRecords = 
writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
     long totalRecords = 
writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
     boolean hasErrors = totalErrorRecords > 0;
     if (!hasErrors || cfg.commitOnErrors) {
       HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
       if (!getBooleanWithAltKeys(props, CHECKPOINT_FORCE_SKIP)) {
-        if (checkpointStr != null) {
-          checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
+        if (inputBatch.getCheckpointForNextBatch() != null) {
+          checkpointCommitMetadata.put(CHECKPOINT_KEY, 
inputBatch.getCheckpointForNextBatch());
         }
         if (cfg.checkpoint != null) {
           checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint);
@@ -846,7 +833,7 @@ public class StreamSync implements Serializable, Closeable {
       boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, 
partitionToReplacedFileIds, Option.empty());
       if (success) {
         LOG.info("Commit " + instantTime + " successful!");
-        this.formatAdapter.getSource().onCommit(checkpointStr);
+        
this.formatAdapter.getSource().onCommit(inputBatch.getCheckpointForNextBatch());
         // Schedule compaction if needed
         if (cfg.isAsyncCompactionEnabled()) {
           scheduledCompactionInstant = 
writeClient.scheduleCompaction(Option.empty());
@@ -913,6 +900,58 @@ public class StreamSync implements Serializable, Closeable 
{
     throw lastException;
   }
 
+  private Pair<WriteClientWriteResult, Boolean> writeToSink(InputBatch 
inputBatch, String instantTime, boolean inputIsEmpty) {
+    WriteClientWriteResult writeClientWriteResult = null;
+    instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
+    boolean isEmpty = inputIsEmpty;
+
+    if (useRowWriter) {
+      Dataset<Row> df = (Dataset<Row>) inputBatch.getBatch().get();
+      HoodieWriteConfig hoodieWriteConfig = 
prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema());
+      BaseDatasetBulkInsertCommitActionExecutor executor = new 
HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, 
writeClient, instantTime);
+      writeClientWriteResult = new WriteClientWriteResult(executor.execute(df, 
!HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses());
+    } else {
+      JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>) 
inputBatch.getBatch().get();
+      // filter dupes if needed
+      if (cfg.filterDupes) {
+        records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(), 
records, writeClient.getConfig());
+        isEmpty = records.isEmpty();
+      }
+
+      HoodieWriteResult writeResult = null;
+      switch (cfg.operation) {
+        case INSERT:
+          writeClientWriteResult = new 
WriteClientWriteResult(writeClient.insert(records, instantTime));
+          break;
+        case UPSERT:
+          writeClientWriteResult = new 
WriteClientWriteResult(writeClient.upsert(records, instantTime));
+          break;
+        case BULK_INSERT:
+          writeClientWriteResult = new 
WriteClientWriteResult(writeClient.bulkInsert(records, instantTime));
+          break;
+        case INSERT_OVERWRITE:
+          writeResult = writeClient.insertOverwrite(records, instantTime);
+          writeClientWriteResult = new 
WriteClientWriteResult(writeResult.getWriteStatuses());
+          
writeClientWriteResult.setPartitionToReplacedFileIds(writeResult.getPartitionToReplaceFileIds());
+          break;
+        case INSERT_OVERWRITE_TABLE:
+          writeResult = writeClient.insertOverwriteTable(records, instantTime);
+          writeClientWriteResult = new 
WriteClientWriteResult(writeResult.getWriteStatuses());
+          
writeClientWriteResult.setPartitionToReplacedFileIds(writeResult.getPartitionToReplaceFileIds());
+          break;
+        case DELETE_PARTITION:
+          List<String> partitions = records.map(record -> 
record.getPartitionPath()).distinct().collect();
+          writeResult = writeClient.deletePartitions(partitions, instantTime);
+          writeClientWriteResult = new 
WriteClientWriteResult(writeResult.getWriteStatuses());
+          
writeClientWriteResult.setPartitionToReplacedFileIds(writeResult.getPartitionToReplaceFileIds());
+          break;
+        default:
+          throw new HoodieStreamerException("Unknown operation : " + 
cfg.operation);
+      }
+    }
+    return Pair.of(writeClientWriteResult, isEmpty);
+  }
+
   private String getSyncClassShortName(String syncClassName) {
     return syncClassName.substring(syncClassName.lastIndexOf(".") + 1);
   }
@@ -969,8 +1008,8 @@ public class StreamSync implements Serializable, Closeable 
{
 
   private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, 
JavaRDD<HoodieRecord> records) throws IOException {
     LOG.info("Setting up new Hoodie Write Client");
-    if (isDropPartitionColumns()) {
-      targetSchema = HoodieAvroUtils.removeFields(targetSchema, 
getPartitionColumns(props));
+    if (HoodieStreamerUtils.isDropPartitionColumns(props)) {
+      targetSchema = HoodieAvroUtils.removeFields(targetSchema, 
HoodieStreamerUtils.getPartitionColumns(props));
     }
     registerAvroSchemas(sourceSchema, targetSchema);
     final HoodieWriteConfig initialWriteConfig = 
getHoodieClientConfig(targetSchema);
@@ -1191,22 +1230,25 @@ public class StreamSync implements Serializable, 
Closeable {
     }
   }
 
-  /**
-   * Set based on hoodie.datasource.write.drop.partition.columns config.
-   * When set to true, will not write the partition columns into the table.
-   */
-  private Boolean isDropPartitionColumns() {
-    return props.getBoolean(DROP_PARTITION_COLUMNS.key(), 
DROP_PARTITION_COLUMNS.defaultValue());
-  }
+  class WriteClientWriteResult {
+    private Map<String, List<String>> partitionToReplacedFileIds = 
Collections.emptyMap();
+    private JavaRDD<WriteStatus> writeStatusRDD;
 
-  /**
-   * Get the partition columns as a set of strings.
-   *
-   * @param props TypedProperties
-   * @return Set of partition columns.
-   */
-  private Set<String> getPartitionColumns(TypedProperties props) {
-    String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
-    return 
Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
+    public WriteClientWriteResult(JavaRDD<WriteStatus> writeStatusRDD) {
+      this.writeStatusRDD = writeStatusRDD;
+    }
+
+    public Map<String, List<String>> getPartitionToReplacedFileIds() {
+      return partitionToReplacedFileIds;
+    }
+
+    public void setPartitionToReplacedFileIds(Map<String, List<String>> 
partitionToReplacedFileIds) {
+      this.partitionToReplacedFileIds = partitionToReplacedFileIds;
+    }
+
+    public JavaRDD<WriteStatus> getWriteStatusRDD() {
+      return writeStatusRDD;
+    }
   }
+
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 94f42e3713a..095a34cf75c 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -399,7 +399,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     }
   }
 
-  static List<String> getAsyncServicesConfigs(int totalRecords, String 
autoClean, String inlineCluster,
+  static List<String> getTableServicesConfigs(int totalRecords, String 
autoClean, String inlineCluster,
                                               String inlineClusterMaxCommit, 
String asyncCluster, String asyncClusterMaxCommit) {
     List<String> configs = new ArrayList<>();
     configs.add(String.format("%s=%d", 
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
@@ -634,7 +634,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
         boolean ret = false;
         while (!ret && !dsFuture.isDone()) {
           try {
-            Thread.sleep(3000);
+            Thread.sleep(2000);
             ret = condition.apply(true);
           } catch (Throwable error) {
             LOG.warn("Got error :", error);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index d1cb3f1b240..c5133a314fd 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -209,7 +209,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
-    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
asyncCluster, ""));
+    cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "", 
asyncCluster, ""));
     cfg.configs.addAll(getAllMultiWriterConfigs());
     customConfigs.forEach(config -> cfg.configs.add(config));
     return new HoodieDeltaStreamer(cfg, jsc);
@@ -783,7 +783,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
-    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
+    cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
     cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
@@ -817,7 +817,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
 Option.empty());
 
     // do another ingestion with inline clustering enabled
-    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
+    cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
     cfg.retryLastPendingInlineClusteringJob = true;
     HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
     ds2.sync();
@@ -884,7 +884,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
-    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
+    cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
     cfg.configs.add(String.format("%s=%s", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
     cfg.configs.add(String.format("%s=%s", 
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
     cfg.configs.add(String.format("%s=%s", 
HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
@@ -934,7 +934,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertFalse(replacedFilePaths.isEmpty());
 
     // Step 4 : Add commits with insert of 1 record and trigger sync/async 
cleaner and archive.
-    List<String> configs = getAsyncServicesConfigs(1, "true", "true", "6", "", 
"");
+    List<String> configs = getTableServicesConfigs(1, "true", "true", "6", "", 
"");
     configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_POLICY.key(), 
"KEEP_LATEST_COMMITS"));
     configs.add(String.format("%s=%s", 
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
     configs.add(String.format("%s=%s", 
HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "4"));
@@ -1129,7 +1129,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
-    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
+    cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
     cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
@@ -1165,7 +1165,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
-    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", "2"));
+    cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "", 
"true", "2"));
     cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
@@ -1193,7 +1193,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
-    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
+    cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs);
@@ -1218,7 +1218,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = false;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
-    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "false", 
"0", "false", "0"));
+    cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "false", 
"0", "false", "0"));
     cfg.configs.addAll(getAllMultiWriterConfigs());
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     ds.sync();
@@ -1306,6 +1306,152 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }
   }
 
+  @Test
+  public void testBulkInsertRowWriterNoSchemaProviderNoTransformer() throws 
Exception {
+    testBulkInsertRowWriterMultiBatches(false, null);
+  }
+
+  @Test
+  public void testBulkInsertRowWriterWithoutSchemaProviderAndTransformer() 
throws Exception {
+    testBulkInsertRowWriterMultiBatches(false, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
+  }
+
+  @Test
+  public void testBulkInsertRowWriterWithSchemaProviderAndNoTransformer() 
throws Exception {
+    testBulkInsertRowWriterMultiBatches(true, null);
+  }
+
+  @Test
+  public void testBulkInsertRowWriterWithSchemaProviderAndTransformer() throws 
Exception {
+    testBulkInsertRowWriterMultiBatches(true, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
+  }
+
+  @Test
+  public void testBulkInsertRowWriterForEmptyBatch() throws Exception {
+    testBulkInsertRowWriterMultiBatches(false, null, true);
+  }
+
+  private void testBulkInsertRowWriterMultiBatches(boolean useSchemaProvider, 
List<String> transformerClassNames) throws Exception {
+    testBulkInsertRowWriterMultiBatches(useSchemaProvider, 
transformerClassNames, false);
+  }
+
+  private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider, 
List<String> transformerClassNames, boolean testEmptyBatch) throws Exception {
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+    int parquetRecordsCount = 100;
+    boolean hasTransformer = transformerClassNames != null && 
!transformerClassNames.isEmpty();
+    prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, false, null, null);
+    prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", 
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : 
"");
+
+    String tableBasePath = basePath + "/test_parquet_table" + testNum;
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT, testEmptyBatch ? 
TestParquetDFSSourceEmptyBatch.class.getName()
+            : ParquetDFSSource.class.getName(),
+        transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
+        useSchemaProvider, 100000, false, null, null, "timestamp", null);
+    cfg.configs.add(DataSourceWriteOptions.ENABLE_ROW_WRITER().key() + 
"=true");
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
+    deltaStreamer.sync();
+    assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+
+    try {
+      if (testEmptyBatch) {
+        prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, 
null, null);
+        deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
+        deltaStreamer.sync();
+        // since we mimic'ed empty batch, total records should be same as 
first sync().
+        assertRecordCount(200, tableBasePath, sqlContext);
+        HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
+
+        // validate table schema fetches valid schema from last but one commit.
+        TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+        assertNotEquals(tableSchemaResolver.getTableAvroSchema(), 
Schema.create(Schema.Type.NULL).toString());
+      }
+
+      int recordsSoFar = testEmptyBatch ? 200 : 100;
+
+      // add 3 more batches and ensure all commits succeed.
+      for (int i = 2; i < 5; i++) {
+        prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, Integer.toString(i) + 
".parquet", false, null, null);
+        deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
+        deltaStreamer.sync();
+        assertRecordCount(recordsSoFar + (i - 1) * 100, tableBasePath, 
sqlContext);
+        if (i == 2 || i == 4) { // this validation reloads the timeline. So, 
we are validating only for first and last batch.
+          // validate commit metadata for all completed commits to have valid 
schema in extra metadata.
+          HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
+          
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry
 -> assertValidSchemaInCommitMetadata(entry, metaClient));
+        }
+      }
+    } finally {
+      deltaStreamer.shutdownGracefully();
+    }
+    testNum++;
+  }
+
+  @Test
+  public void testBulkInsertRowWriterContinuousModeWithAsyncClustering() 
throws Exception {
+    testBulkInsertRowWriterContinuousMode(false, null, false,
+        getTableServicesConfigs(2000, "false", "", "", "true", "3"));
+  }
+
+  @Test
+  public void testBulkInsertRowWriterContinuousModeWithInlineClustering() 
throws Exception {
+    testBulkInsertRowWriterContinuousMode(false, null, false,
+        getTableServicesConfigs(2000, "false", "true", "3", "false", ""));
+  }
+
+  private void testBulkInsertRowWriterContinuousMode(Boolean 
useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch, 
List<String> customConfigs) throws Exception {
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+    int parquetRecordsCount = 100;
+    boolean hasTransformer = transformerClassNames != null && 
!transformerClassNames.isEmpty();
+    prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, false, null, null);
+    prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", 
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : 
"");
+
+    // generate data asynchronously.
+    Future inputGenerationFuture = 
Executors.newSingleThreadExecutor().submit(() -> {
+      try {
+        int counter = 2;
+        while (counter < 100) { // lets keep going. if the test times out, we 
will cancel the future within finally. So, safe to generate 100 batches.
+          LOG.info("Generating data for batch " + counter);
+          prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT,  
Integer.toString(counter) + ".parquet", false, null, null);
+          counter++;
+          Thread.sleep(2000);
+        }
+      } catch (Exception ex) {
+        LOG.warn("Input data generation failed", ex.getMessage());
+        throw new RuntimeException(ex.getMessage(), ex);
+      }
+    });
+
+    // initialize configs for continuous ds
+    String tableBasePath = basePath + "/test_parquet_table" + testNum;
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT, testEmptyBatch ? 
TestParquetDFSSourceEmptyBatch.class.getName()
+            : ParquetDFSSource.class.getName(),
+        transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
+        useSchemaProvider, 100000, false, null, null, "timestamp", null);
+    cfg.continuousMode = true;
+    cfg.configs.add(DataSourceWriteOptions.ENABLE_ROW_WRITER().key() + 
"=true");
+    cfg.configs.addAll(customConfigs);
+
+    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+    // trigger continuous DS and wait until 1 replace commit is complete.
+    try {
+      deltaStreamerTestRunner(ds, cfg, (r) -> {
+        TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
+        return true;
+      });
+      // There should be 4 commits, one of which should be a replace commit
+      TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
+      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
+    } finally {
+      // clean up resources
+      ds.shutdownGracefully();
+      inputGenerationFuture.cancel(true);
+      UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
+    }
+    testNum++;
+  }
+
   /**
    * Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental 
processing using a 2 step pipeline The first
    * step involves using a SQL template to transform a source TEST-DATA-SOURCE 
============================> HUDI TABLE
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
index 53e1733c9a6..48a8a7100ff 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
@@ -61,7 +61,7 @@ public class TestHoodieDeltaStreamerDAGExecution extends 
HoodieDeltaStreamerTest
     // Configure 3 transformers of same type. 2nd transformer has no suffix
     StageListener stageListener = new 
StageListener("org.apache.hudi.table.action.commit.BaseCommitActionExecutor.executeClustering");
     sparkSession.sparkContext().addSparkListener(stageListener);
-    List<String> configs = getAsyncServicesConfigs(100, "false", "true", "1", 
"", "");
+    List<String> configs = getTableServicesConfigs(100, "false", "true", "1", 
"", "");
     runDeltaStreamer(WriteOperationType.UPSERT, false, Option.of(configs));
     assertEquals(1, stageListener.triggerCount);
   }

Reply via email to