nsivabalan commented on code in PR #9913:
URL: https://github.com/apache/hudi/pull/9913#discussion_r1373908550


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -265,6 +271,8 @@ public class StreamSync implements Serializable, Closeable {
 
   private final boolean autoGenerateRecordKeys;
 
+  private final boolean rowBulkInsert;

Review Comment:
   useSparkDatasourceWrite
   or useRowWriter



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -584,10 +605,16 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
                 (SchemaProvider) new DelegatingSchemaProvider(props, 
hoodieSparkContext.jsc(), dataAndCheckpoint.getSchemaProvider(),
                     new SimpleSchemaProvider(hoodieSparkContext.jsc(), 
targetSchema, props)))
             .orElse(dataAndCheckpoint.getSchemaProvider());
+        if (rowBulkInsert) {
+          return checkEmpty(new InputBatch(transformed, checkpointStr, 
schemaProvider), resumeCheckpointStr);

Review Comment:
   isn't the 2nd arg, resumeCheckpointStr to InoputBach constr



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commit;
+
+import org.apache.hudi.HoodieDatasetBulkInsertHelper;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+public class HoodieStreamerDatasetBulkInsertCommitActionExecutor extends 
BaseDatasetBulkInsertCommitActionExecutor {

Review Comment:
   java docs



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -754,56 +798,72 @@ protected Option<String> 
getLatestInstantWithValidCheckpointInfo(Option<HoodieTi
    * Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive 
if needed.
    *
    * @param instantTime         instant time to use for ingest.
-   * @param records             Input Records
-   * @param checkpointStr       Checkpoint String
+   * @param inputBatch          input batch that contains the records, 
checkpoint, and schema provider
    * @param metrics             Metrics
    * @param overallTimerContext Timer Context
    * @return Option Compaction instant if one is scheduled
    */
-  private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(String 
instantTime, JavaRDD<HoodieRecord> records, String checkpointStr,
+  private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(String 
instantTime, InputBatch inputBatch,
                                                                  
HoodieIngestionMetrics metrics,
                                                                  Timer.Context 
overallTimerContext) {
     Option<String> scheduledCompactionInstant = Option.empty();
-    // filter dupes if needed
-    if (cfg.filterDupes) {
-      records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(), 
records, writeClient.getConfig());
-    }
-
-    boolean isEmpty = records.isEmpty();
-    instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
-    LOG.info("Starting commit  : " + instantTime);
-
     HoodieWriteResult writeResult;
     Map<String, List<String>> partitionToReplacedFileIds = 
Collections.emptyMap();
     JavaRDD<WriteStatus> writeStatusRDD;
-    switch (cfg.operation) {
-      case INSERT:
-        writeStatusRDD = writeClient.insert(records, instantTime);
-        break;
-      case UPSERT:
-        writeStatusRDD = writeClient.upsert(records, instantTime);
-        break;
-      case BULK_INSERT:
-        writeStatusRDD = writeClient.bulkInsert(records, instantTime);
-        break;
-      case INSERT_OVERWRITE:
-        writeResult = writeClient.insertOverwrite(records, instantTime);
-        partitionToReplacedFileIds = 
writeResult.getPartitionToReplaceFileIds();
-        writeStatusRDD = writeResult.getWriteStatuses();
-        break;
-      case INSERT_OVERWRITE_TABLE:
-        writeResult = writeClient.insertOverwriteTable(records, instantTime);
-        partitionToReplacedFileIds = 
writeResult.getPartitionToReplaceFileIds();
-        writeStatusRDD = writeResult.getWriteStatuses();
-        break;
-      case DELETE_PARTITION:
-        List<String> partitions = records.map(record -> 
record.getPartitionPath()).distinct().collect();
-        writeResult = writeClient.deletePartitions(partitions, instantTime);
-        partitionToReplacedFileIds = 
writeResult.getPartitionToReplaceFileIds();
-        writeStatusRDD = writeResult.getWriteStatuses();
-        break;
-      default:
-        throw new HoodieStreamerException("Unknown operation : " + 
cfg.operation);
+    instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
+    LOG.info("Starting commit  : " + instantTime);
+    boolean isEmpty;

Review Comment:
   we could take the opportunity to reuse isEmpty as well. 
   we already have the value for isEmpty from within fetchFromSource. so, we 
should wire that in here instead of checking explicitly again here 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -408,19 +420,25 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> 
syncOnce() throws IOException
         .build();
     String instantTime = metaClient.createNewInstantTime();
 
-    Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
srcRecordsWithCkpt = readFromSource(instantTime);
+    InputBatch inputBatch = readFromSource(instantTime);
+
+    if (inputBatch != null) {
+      final JavaRDD<HoodieRecord> recordsFromSource;
+      if (rowBulkInsert) {

Review Comment:
   we might need some fixes here. 
   looks like we call 
   ```
   setupWriteClient(recordsFromSource);
   ```
   below. 
   within that, we might estimate the size and set some write configs. 
   So, even for row writer, we might have to do that. 
   
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -297,13 +304,17 @@ public StreamSync(HoodieStreamer.Config cfg, SparkSession 
sparkSession, SchemaPr
       this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg, 
sparkSession, props, hoodieSparkContext, fs);
       this.errorWriteFailureStrategy = 
ErrorTableUtils.getErrorWriteFailureStrategy(props);
     }
-    this.formatAdapter = new SourceFormatAdapter(
-        UtilHelpers.createSource(cfg.sourceClassName, props, 
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics),
-        this.errorTableWriter, Option.of(props));
+    Source source = UtilHelpers.createSource(cfg.sourceClassName, props, 
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics);
+    this.formatAdapter = new SourceFormatAdapter(source, 
this.errorTableWriter, Option.of(props));
 
     this.transformer = 
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames),
         
Option.ofNullable(schemaProvider).map(SchemaProvider::getSourceSchema), 
this.errorTableWriter.isPresent());
-
+    if (this.cfg.operation == WriteOperationType.BULK_INSERT && 
source.getSourceType() != Source.SourceType.AVRO) {

Review Comment:
   lets make this only for ROW source



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -584,10 +605,16 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
                 (SchemaProvider) new DelegatingSchemaProvider(props, 
hoodieSparkContext.jsc(), dataAndCheckpoint.getSchemaProvider(),
                     new SimpleSchemaProvider(hoodieSparkContext.jsc(), 
targetSchema, props)))
             .orElse(dataAndCheckpoint.getSchemaProvider());
+        if (rowBulkInsert) {
+          return checkEmpty(new InputBatch(transformed, checkpointStr, 
schemaProvider), resumeCheckpointStr);

Review Comment:
   I feel the checkEmpty and return at 3 places is not elegant. 
   can we introduce a local variable which will hold the InputBatch and 
eventually return it and we can do checkEmpty there. 
   
   
   For eg:
   ```
         if (this.userProvidedSchemaProvider != null && 
this.userProvidedSchemaProvider.getTargetSchema() != null) {
           if (rowBulkInsert) {
             inputBatchRowWriterToReturn = new InputBatch(transformed, 
checkpointStr, this.userProvidedSchemaProvider), resumeCheckpointStr);
           } else {
   
             // If the target schema is specified through Avro schema,
             // pass in the schema for the Row-to-Avro conversion
             // to avoid nullability mismatch between Avro schema and Row schema
             if (errorTableWriter.isPresent()
                 && 
props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
                 
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue())) {
               // If the above conditions are met, trigger error events for the 
rows whose conversion to
               // avro records fails.
               avroRDDOptional = transformed.map(
                   rowDataset -> {
                     Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs = 
HoodieSparkUtils.safeCreateRDD(rowDataset,
                         HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
                         
Option.of(this.userProvidedSchemaProvider.getTargetSchema()));
                     
errorTableWriter.get().addErrorEvents(safeCreateRDDs._2().toJavaRDD()
                         .map(evStr -> new ErrorEvent<>(evStr,
                             
ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
                     return safeCreateRDDs._1.toJavaRDD();
                   });
             } else {
               avroRDDOptional = transformed.map(
                   rowDataset -> getTransformedRDD(rowDataset, reconcileSchema, 
this.userProvidedSchemaProvider.getTargetSchema()));
             }
             schemaProvider = this.userProvidedSchemaProvider;
           }
         } else {
           Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs, 
cfg.targetBasePath);
           // Deduce proper target (writer's) schema for the transformed 
dataset, reconciling its
           // schema w/ the table's one
           Option<Schema> targetSchemaOpt = transformed.map(df -> {
             Schema sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
                 
latestTableSchemaOpt.map(Schema::getFullName).orElse(getAvroRecordQualifiedName(cfg.targetTableName)));
             // Target (writer's) schema is determined based on the incoming 
source schema
             // and existing table's one, reconciling the two (if necessary) 
based on configuration
             return HoodieSparkSqlWriter.deduceWriterSchema(
                 sourceSchema,
                 
HoodieConversionUtils.<Schema>toScalaOption(latestTableSchemaOpt),
                 
HoodieConversionUtils.<InternalSchema>toScalaOption(Option.empty()),
                 HoodieConversionUtils.fromProperties(props));
           });
           // Override schema provider with the reconciled target schema
           schemaProvider = targetSchemaOpt.map(targetSchema ->
                   (SchemaProvider) new DelegatingSchemaProvider(props, 
hoodieSparkContext.jsc(), dataAndCheckpoint.getSchemaProvider(),
                       new SimpleSchemaProvider(hoodieSparkContext.jsc(), 
targetSchema, props)))
               .orElse(dataAndCheckpoint.getSchemaProvider());
           if (rowBulkInsert) {
             inputBatchRowWriterToReturn = new InputBatch(transformed, 
checkpointStr, schemaProvider);
           } else {
             // Rewrite transformed records into the expected target schema
             avroRDDOptional = transformed.map(t -> getTransformedRDD(t, 
reconcileSchema, schemaProvider.getTargetSchema()));
           }
         }
       } else {
         if (rowBulkInsert) {
           inputBatchRowWriterToReturn = 
formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
         } else {
           // Pull the data from the source & prepare the write
           InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
               formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, 
cfg.sourceLimit);
           avroRDDOptional = dataAndCheckpoint.getBatch();
           checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
           schemaProvider = dataAndCheckpoint.getSchemaProvider();
         }
       }
   ```
   
   
   After this, we could check checkEmpty and return right away for row writer 
use-case. 
   but for avro we might continue further on a need basis. 
   
   also, checkEmpty can be generalized or funcationlized so that we can re-use 
for both avro and spark rows
   
   
   
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -584,10 +605,16 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
                 (SchemaProvider) new DelegatingSchemaProvider(props, 
hoodieSparkContext.jsc(), dataAndCheckpoint.getSchemaProvider(),
                     new SimpleSchemaProvider(hoodieSparkContext.jsc(), 
targetSchema, props)))
             .orElse(dataAndCheckpoint.getSchemaProvider());
+        if (rowBulkInsert) {
+          return checkEmpty(new InputBatch(transformed, checkpointStr, 
schemaProvider), resumeCheckpointStr);

Review Comment:
   let me know what do you think of the suggestion. happy to jam 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -754,56 +798,72 @@ protected Option<String> 
getLatestInstantWithValidCheckpointInfo(Option<HoodieTi
    * Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive 
if needed.
    *
    * @param instantTime         instant time to use for ingest.
-   * @param records             Input Records
-   * @param checkpointStr       Checkpoint String
+   * @param inputBatch          input batch that contains the records, 
checkpoint, and schema provider
    * @param metrics             Metrics
    * @param overallTimerContext Timer Context
    * @return Option Compaction instant if one is scheduled
    */
-  private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(String 
instantTime, JavaRDD<HoodieRecord> records, String checkpointStr,
+  private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(String 
instantTime, InputBatch inputBatch,
                                                                  
HoodieIngestionMetrics metrics,
                                                                  Timer.Context 
overallTimerContext) {
     Option<String> scheduledCompactionInstant = Option.empty();
-    // filter dupes if needed
-    if (cfg.filterDupes) {
-      records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(), 
records, writeClient.getConfig());
-    }
-
-    boolean isEmpty = records.isEmpty();
-    instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
-    LOG.info("Starting commit  : " + instantTime);
-
     HoodieWriteResult writeResult;
     Map<String, List<String>> partitionToReplacedFileIds = 
Collections.emptyMap();
     JavaRDD<WriteStatus> writeStatusRDD;
-    switch (cfg.operation) {
-      case INSERT:
-        writeStatusRDD = writeClient.insert(records, instantTime);
-        break;
-      case UPSERT:
-        writeStatusRDD = writeClient.upsert(records, instantTime);
-        break;
-      case BULK_INSERT:
-        writeStatusRDD = writeClient.bulkInsert(records, instantTime);
-        break;
-      case INSERT_OVERWRITE:
-        writeResult = writeClient.insertOverwrite(records, instantTime);
-        partitionToReplacedFileIds = 
writeResult.getPartitionToReplaceFileIds();
-        writeStatusRDD = writeResult.getWriteStatuses();
-        break;
-      case INSERT_OVERWRITE_TABLE:
-        writeResult = writeClient.insertOverwriteTable(records, instantTime);
-        partitionToReplacedFileIds = 
writeResult.getPartitionToReplaceFileIds();
-        writeStatusRDD = writeResult.getWriteStatuses();
-        break;
-      case DELETE_PARTITION:
-        List<String> partitions = records.map(record -> 
record.getPartitionPath()).distinct().collect();
-        writeResult = writeClient.deletePartitions(partitions, instantTime);
-        partitionToReplacedFileIds = 
writeResult.getPartitionToReplaceFileIds();
-        writeStatusRDD = writeResult.getWriteStatuses();
-        break;
-      default:
-        throw new HoodieStreamerException("Unknown operation : " + 
cfg.operation);
+    instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
+    LOG.info("Starting commit  : " + instantTime);
+    boolean isEmpty;
+    if (rowBulkInsert) {
+      Dataset<Row> df = (Dataset<Row>) inputBatch.getBatch().get();
+      isEmpty = df.isEmpty();
+
+      HoodieConfig hoodieConfig = new 
HoodieConfig(HoodieStreamer.Config.getProps(fs, cfg));

Review Comment:
   lets move this to a method called prepareHoodieConfigForRowWriter()



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -297,13 +304,17 @@ public StreamSync(HoodieStreamer.Config cfg, SparkSession 
sparkSession, SchemaPr
       this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg, 
sparkSession, props, hoodieSparkContext, fs);
       this.errorWriteFailureStrategy = 
ErrorTableUtils.getErrorWriteFailureStrategy(props);
     }
-    this.formatAdapter = new SourceFormatAdapter(
-        UtilHelpers.createSource(cfg.sourceClassName, props, 
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics),
-        this.errorTableWriter, Option.of(props));
+    Source source = UtilHelpers.createSource(cfg.sourceClassName, props, 
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics);
+    this.formatAdapter = new SourceFormatAdapter(source, 
this.errorTableWriter, Option.of(props));
 
     this.transformer = 
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames),
         
Option.ofNullable(schemaProvider).map(SchemaProvider::getSourceSchema), 
this.errorTableWriter.isPresent());
-
+    if (this.cfg.operation == WriteOperationType.BULK_INSERT && 
source.getSourceType() != Source.SourceType.AVRO) {

Review Comment:
   actually, w/ the cur state of patch, lets not bring in RECORD_MERGER_IMPLS. 
   lets use "hoodie.datasource.write.row.writer.enable" only. 
   if its explicitly set by the user, we can take the new route. 
   if not, we go w/ regular flow or write client based one. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -408,19 +420,25 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> 
syncOnce() throws IOException
         .build();
     String instantTime = metaClient.createNewInstantTime();
 
-    Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
srcRecordsWithCkpt = readFromSource(instantTime);
+    InputBatch inputBatch = readFromSource(instantTime);
+
+    if (inputBatch != null) {
+      final JavaRDD<HoodieRecord> recordsFromSource;

Review Comment:
   lets make this Option<> and avoid instantiating w/ empty RDD



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -408,19 +420,25 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> 
syncOnce() throws IOException
         .build();
     String instantTime = metaClient.createNewInstantTime();
 
-    Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
srcRecordsWithCkpt = readFromSource(instantTime);
+    InputBatch inputBatch = readFromSource(instantTime);
+
+    if (inputBatch != null) {
+      final JavaRDD<HoodieRecord> recordsFromSource;
+      if (rowBulkInsert) {

Review Comment:
   If you feel, thats too much work/ or involves too much change, let me know. 
lets see if we can punt it. but lets take a stab at fixing it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to