This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f0bcee3  [HUDI-3561] Avoid including whole 
`MultipleSparkJobExecutionStrategy` object into the closure for Spark to 
serialize (#4954)
f0bcee3 is described below

commit f0bcee3c014cf59bdad3eaf8212d94a589073f0b
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Mon Mar 7 10:42:03 2022 -0800

    [HUDI-3561] Avoid including whole `MultipleSparkJobExecutionStrategy` 
object into the closure for Spark to serialize (#4954)
    
    - Avoid including whole MultipleSparkJobExecutionStrategy object into the 
closure for Spark to serialize
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  2 +-
 .../MultipleSparkJobExecutionStrategy.java         | 52 ++++++++++++----------
 2 files changed, 30 insertions(+), 24 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index c64b4ed..c884656 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1051,7 +1051,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public BulkInsertSortMode getBulkInsertSortMode() {
-    String sortMode = getString(BULK_INSERT_SORT_MODE);
+    String sortMode = getStringOrDefault(BULK_INSERT_SORT_MODE);
     return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 91d1f4e..54c1c9f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -18,6 +18,10 @@
 
 package org.apache.hudi.client.clustering.run.strategy;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -25,7 +29,7 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.utils.ConcatenatingIterator;
-import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.ClusteringOperation;
@@ -58,11 +62,6 @@ import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -246,21 +245,28 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
    */
   private JavaRDD<HoodieRecord<T>> 
readRecordsForGroupBaseFiles(JavaSparkContext jsc,
                                                                 
List<ClusteringOperation> clusteringOps) {
-    return jsc.parallelize(clusteringOps, 
clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
-      List<Iterator<IndexedRecord>> iteratorsForPartition = new ArrayList<>();
-      clusteringOpsPartition.forEachRemaining(clusteringOp -> {
-        try {
-          Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
-          HoodieFileReader<IndexedRecord> baseFileReader = 
HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new 
Path(clusteringOp.getDataFilePath()));
-          
iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema));
-        } catch (IOException e) {
-          throw new HoodieClusteringException("Error reading input data for " 
+ clusteringOp.getDataFilePath()
-              + " and " + clusteringOp.getDeltaFilePaths(), e);
-        }
-      });
+    SerializableConfiguration hadoopConf = new 
SerializableConfiguration(getHoodieTable().getHadoopConf());
+    HoodieWriteConfig writeConfig = getWriteConfig();
+
+    // NOTE: It's crucial to make sure that we don't capture whole "this" 
object into the
+    //       closure, as this might lead to issues attempting to serialize its 
nested fields
+    return jsc.parallelize(clusteringOps, clusteringOps.size())
+        .mapPartitions(clusteringOpsPartition -> {
+          List<Iterator<IndexedRecord>> iteratorsForPartition = new 
ArrayList<>();
+          clusteringOpsPartition.forEachRemaining(clusteringOp -> {
+            try {
+              Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(writeConfig.getSchema()));
+              HoodieFileReader<IndexedRecord> baseFileReader = 
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new 
Path(clusteringOp.getDataFilePath()));
+              
iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema));
+            } catch (IOException e) {
+              throw new HoodieClusteringException("Error reading input data 
for " + clusteringOp.getDataFilePath()
+                  + " and " + clusteringOp.getDeltaFilePaths(), e);
+            }
+          });
 
-      return new ConcatenatingIterator<>(iteratorsForPartition);
-    }).map(this::transform);
+          return new ConcatenatingIterator<>(iteratorsForPartition);
+        })
+        .map(record -> transform(record, writeConfig));
   }
 
   /**
@@ -279,12 +285,12 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
   /**
    * Transform IndexedRecord into HoodieRecord.
    */
-  private HoodieRecord<T> transform(IndexedRecord indexedRecord) {
+  private static <T> HoodieRecord<T> transform(IndexedRecord indexedRecord, 
HoodieWriteConfig writeConfig) {
     GenericRecord record = (GenericRecord) indexedRecord;
     Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
-    if (!getWriteConfig().populateMetaFields()) {
+    if (!writeConfig.populateMetaFields()) {
       try {
-        keyGeneratorOpt = Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(getWriteConfig().getProps())));
+        keyGeneratorOpt = Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
       } catch (IOException e) {
         throw new HoodieIOException("Only BaseKeyGenerators are supported when 
meta columns are disabled ", e);
       }

Reply via email to