This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch HUDI-8990 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 184231de7b02e33baad0d6ee5b08f2075f0e7f9c Author: YueZhang <[email protected]> AuthorDate: Thu Mar 20 16:58:36 2025 +0800 finish Spark call update hashing config need more test --- .../org/apache/hudi/config/HoodieWriteConfig.java | 8 + .../bucket/PartitionBucketIndexCalculator.java | 24 +- .../index/bucket/PartitionBucketIndexUtils.java | 15 +- .../BucketIndexBulkInsertPartitionerWithRows.java | 9 +- .../RDDSimpleBucketBulkInsertPartitioner.java | 2 +- .../BucketBulkInsertDataInternalWriterHelper.java | 2 +- .../action/commit/SparkBucketIndexPartitioner.java | 2 +- .../model/PartitionBucketIndexHashingConfig.java | 4 + .../hudi/common/model/WriteOperationType.java | 1 + .../DatasetBucketRescaleCommitActionExecutor.java | 77 ++++++ .../org/apache/hudi/HoodieSparkSqlWriter.scala | 12 +- .../spark/sql/hudi/ProvidesHoodieConfig.scala | 24 +- .../procedures/PartitionBucketIndexManager.scala | 289 +++++++++++++++++++++ .../sql/hudi/common/HoodieSparkSqlTestBase.scala | 1 + .../spark/sql/hudi/dml/TestInsertTable.scala | 88 +++++++ .../TestPartitionBucketIndexManager.scala | 31 +++ 16 files changed, 575 insertions(+), 14 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 557f8672e19..623f02e623c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1959,6 +1959,14 @@ public class HoodieWriteConfig extends HoodieConfig { return getString(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT); } + public String getBucketIndexPartitionExpression() { + return getString(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS); + } + + public String getBucketIndexPartitionRuleType() { + return getString(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE); + } + public String getIndexClass() { return getString(HoodieIndexConfig.INDEX_CLASS_NAME); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java index 65a08363402..acd48bb9961 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -55,7 +56,7 @@ public class PartitionBucketIndexCalculator implements Serializable { private static final int CACHE_SIZE = 100_000; private PartitionBucketIndexHashingConfig hashingConfig; private int defaultBucketNumber; - private final String instantToLoad; + private String instantToLoad; // Cache for partition to bucket number mapping @SuppressWarnings("unchecked") private final Map<String, Integer> partitionToBucketCache = new LRUMap(CACHE_SIZE); @@ -89,6 +90,14 @@ public class PartitionBucketIndexCalculator implements Serializable { } } + private PartitionBucketIndexCalculator(String instantToLoad, PartitionBucketIndexHashingConfig config) { + this.hashingConfig = config; + this.defaultBucketNumber = config.getDefaultBucketNumber(); + String expressions = config.getExpressions(); + String ruleType = config.getRule(); + this.ruleEngine = createRuleEngine(ruleType, expressions); + } + private void init(HoodieStorage storage, StoragePath hashingConfigPath) { Option<PartitionBucketIndexHashingConfig> config = PartitionBucketIndexUtils.loadHashingConfig(storage, hashingConfigPath); ValidationUtils.checkArgument(config.isPresent()); @@ -124,6 +133,15 @@ public class PartitionBucketIndexCalculator implements Serializable { }); } + public static PartitionBucketIndexCalculator getInstance(String instantToLoad, PartitionBucketIndexHashingConfig config) { + // Using instantToLoad as the key for the cache + return INSTANCES.computeIfAbsent(instantToLoad, + key -> { + LOG.info("Creating new PartitionBucketIndexCalculator instance for instantToLoad: {}", key); + return new PartitionBucketIndexCalculator(key, config); + }); + } + /** * Computes the bucket number for a given partition path * @@ -182,6 +200,10 @@ public class PartitionBucketIndexCalculator implements Serializable { return partitionToBucketCache.size(); } + public Map<String, Integer> getPartitionToBucket() { + return new HashMap<>(partitionToBucketCache); + } + public void clearCache() { partitionToBucketCache.clear(); LOG.info("Cleared partition to bucket number cache"); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java index 69e29bd92e3..ad1e1d0b3bf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; public class PartitionBucketIndexUtils { @@ -77,11 +78,14 @@ public class PartitionBucketIndexUtils { return false; } String hashingInstant = StringUtils.isNullOrEmpty(instant) ? INITIAL_HASHING_CONFIG_INSTANT : instant; - HoodieStorage storage = metaClient.getStorage(); PartitionBucketIndexHashingConfig hashingConfig = new PartitionBucketIndexHashingConfig(expressions, defaultBucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, hashingInstant); - StoragePath hashingConfigPath = new StoragePath(metaClient.getHashingMetadataConfigPath(), hashingConfig.getFilename()); + return saveHashingConfig(hashingConfig, metaClient); + } + public static boolean saveHashingConfig(PartitionBucketIndexHashingConfig hashingConfig, HoodieTableMetaClient metaClient) { + StoragePath hashingConfigPath = new StoragePath(metaClient.getHashingMetadataConfigPath(), hashingConfig.getFilename()); + HoodieStorage storage = metaClient.getStorage(); try { Option<byte []> content = Option.of(hashingConfig.toJsonString().getBytes(StandardCharsets.UTF_8)); storage.createImmutableFileInPath(hashingConfigPath, content.map(HoodieInstantWriter::convertByteArrayToWriter)); @@ -153,4 +157,11 @@ public class PartitionBucketIndexUtils { } return hashingConfigName.substring(0, dotIndex); } + + public static Map<String, Integer> getAllBucketNumbers(PartitionBucketIndexCalculator calc, List<String> partitions) { + for (String partition : partitions) { + calc.computeNumBuckets(partition); + } + return calc.getPartitionToBucket(); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java index e80c8bfac0e..3a94fa78cb3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java @@ -41,12 +41,19 @@ public class BucketIndexBulkInsertPartitionerWithRows implements BulkInsertParti this.indexKeyFields = indexKeyFields; this.bucketNum = bucketNum; String hashingInstantToLoad = table.getConfig().getHashingConfigInstantToLoad(); - this.isPartitionBucketIndexEnable = StringUtils.isNullOrEmpty(hashingInstantToLoad); + this.isPartitionBucketIndexEnable = StringUtils.nonEmpty(hashingInstantToLoad); if (isPartitionBucketIndexEnable) { calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, table.getMetaClient()); } } + public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int bucketNum, PartitionBucketIndexCalculator calc) { + this.indexKeyFields = indexKeyFields; + this.bucketNum = bucketNum; + this.isPartitionBucketIndexEnable = true; + this.calc = calc; + } + @Override public Dataset<Row> repartitionRecords(Dataset<Row> rows, int outputPartitions) { return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields, bucketNum, outputPartitions, calc, isPartitionBucketIndexEnable); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java index d651342c368..393f0054398 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java @@ -49,7 +49,7 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends HoodieRecordPayload> ValidationUtils.checkArgument(table.getIndex() instanceof HoodieSimpleBucketIndex); this.isNonBlockingConcurrencyControl = table.getConfig().isNonBlockingConcurrencyControl(); String hashingInstantToLoad = table.getConfig().getHashingConfigInstantToLoad(); - this.isPartitionBucketIndexEnable = StringUtils.isNullOrEmpty(hashingInstantToLoad); + this.isPartitionBucketIndexEnable = StringUtils.nonEmpty(hashingInstantToLoad); if (isPartitionBucketIndexEnable) { calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, table.getMetaClient()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java index d2b6a04c2e1..7f55c02b851 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java @@ -71,7 +71,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends BulkInsertDataInte this.handles = new HashMap<>(); this.isNonBlockingConcurrencyControl = writeConfig.isNonBlockingConcurrencyControl(); String hashingInstantToLoad = writeConfig.getHashingConfigInstantToLoad(); - this.isPartitionBucketIndexEnable = StringUtils.isNullOrEmpty(hashingInstantToLoad); + this.isPartitionBucketIndexEnable = StringUtils.nonEmpty(hashingInstantToLoad); if (isPartitionBucketIndexEnable) { calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, hoodieTable.getMetaClient()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java index c760d2aabdd..b099392e9e1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java @@ -99,7 +99,7 @@ public class SparkBucketIndexPartitioner<T> extends SparkHoodiePartitioner<T> { + table.getIndex().getClass().getSimpleName()); } String hashingInstantToLoad = table.getConfig().getHashingConfigInstantToLoad(); - this.isPartitionBucketIndexEnable = StringUtils.isNullOrEmpty(hashingInstantToLoad); + this.isPartitionBucketIndexEnable = StringUtils.nonEmpty(hashingInstantToLoad); if (isPartitionBucketIndexEnable) { calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, table.getMetaClient()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java index 46b38896925..a658003f851 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java @@ -62,6 +62,10 @@ public class PartitionBucketIndexHashingConfig implements Serializable { return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); } + public String getInstant() { + return this.instant; + } + public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Exception { if (jsonStr == null || jsonStr.isEmpty()) { // For empty commit file (no data or somethings bad happen). diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java index 1036e8fca44..bc82a04023c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java @@ -41,6 +41,7 @@ public enum WriteOperationType { BOOTSTRAP("bootstrap"), // insert overwrite with static partitioning INSERT_OVERWRITE("insert_overwrite"), + BUCKET_RESCALE("bucket_rescale"), // cluster CLUSTER("cluster"), // delete partition diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java new file mode 100644 index 00000000000..1c6d90eb6f7 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.commit; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.execution.bulkinsert.BucketIndexBulkInsertPartitionerWithRows; +import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator; +import org.apache.hudi.index.bucket.PartitionBucketIndexUtils; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +public class DatasetBucketRescaleCommitActionExecutor extends DatasetBulkInsertOverwriteCommitActionExecutor { + + private final PartitionBucketIndexCalculator calc; + + public DatasetBucketRescaleCommitActionExecutor(HoodieWriteConfig config, + SparkRDDWriteClient writeClient, + String instantTime) { + super(config, writeClient, instantTime); + String expression = config.getBucketIndexPartitionExpression(); + String instant = config.getHashingConfigInstantToLoad(); + String rule = config.getBucketIndexPartitionRuleType(); + int bucketNumber = config.getBucketIndexNumBuckets(); + PartitionBucketIndexHashingConfig hashingConfig = new PartitionBucketIndexHashingConfig(expression, + bucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, instant); + this.calc = PartitionBucketIndexCalculator.getInstance(instantTime, hashingConfig); + } + + /** + * Create BulkInsertPartitioner with prepared PartitionBucketIndexCalculator + * @param populateMetaFields + * @param isTablePartitioned + * @return BulkInsertPartitioner + */ + @Override + protected BulkInsertPartitioner<Dataset<Row>> getPartitioner(boolean populateMetaFields, boolean isTablePartitioned) { + return new BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault(), + writeConfig.getBucketIndexNumBuckets(), calc); + } + + /** + * create new hashing_config during afterExecute and before commit finished + * @param result + */ + @Override + protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>> result) { + super.afterExecute(result); + + PartitionBucketIndexHashingConfig hashingConfig = calc.getHashingConfig(); + boolean res = PartitionBucketIndexUtils.saveHashingConfig(hashingConfig, table.getMetaClient()); + ValidationUtils.checkArgument(res); + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 58cfebf596d..7c59e433c8a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -30,7 +30,7 @@ import org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.commit.{DatasetBulkInsertCommitActionExecutor, DatasetBulkInsertOverwriteCommitActionExecutor, DatasetBulkInsertOverwriteTableCommitActionExecutor} +import org.apache.hudi.commit.{DatasetBucketRescaleCommitActionExecutor, DatasetBulkInsertCommitActionExecutor, DatasetBulkInsertOverwriteCommitActionExecutor, DatasetBulkInsertOverwriteTableCommitActionExecutor} import org.apache.hudi.common.config._ import org.apache.hudi.common.engine.HoodieEngineContext import org.apache.hudi.common.fs.FSUtils @@ -40,7 +40,7 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion, TableSchemaResolver} import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator -import org.apache.hudi.common.util.{CommitUtils, Option => HOption, StringUtils} +import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} @@ -62,7 +62,6 @@ import org.apache.hudi.sync.common.util.SyncUtilHelpers import org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException import org.apache.hudi.util.{SparkConfigUtils, SparkKeyGenUtils} import org.apache.hudi.util.SparkConfigUtils.getStringWithAltKeys - import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.hadoop.conf.Configuration @@ -81,7 +80,6 @@ import org.apache.spark.sql.types.StructType import org.slf4j.LoggerFactory import java.util.function.BiConsumer - import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Success, Try} @@ -815,7 +813,7 @@ class HoodieSparkSqlWriterInternal { val overwriteOperationType = Option(hoodieConfig.getString(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE)) .map(WriteOperationType.fromValue) .orNull - val instantTime = writeClient.createNewInstantTime() + var instantTime = writeClient.createNewInstantTime() val executor = mode match { case _ if overwriteOperationType == null => // Don't need to overwrite @@ -823,6 +821,10 @@ class HoodieSparkSqlWriterInternal { case SaveMode.Append if overwriteOperationType == WriteOperationType.INSERT_OVERWRITE => // INSERT OVERWRITE PARTITION uses Append mode new DatasetBulkInsertOverwriteCommitActionExecutor(writeConfig, writeClient, instantTime) + case SaveMode.Append if overwriteOperationType == WriteOperationType.BUCKET_RESCALE => { + instantTime = writeConfig.getHashingConfigInstantToLoad() + new DatasetBucketRescaleCommitActionExecutor(writeConfig, writeClient, instantTime) + } case SaveMode.Overwrite if overwriteOperationType == WriteOperationType.INSERT_OVERWRITE_TABLE => new DatasetBulkInsertOverwriteTableCommitActionExecutor(writeConfig, writeClient, instantTime) case _ => diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index ff3261bbcd6..6cf5053bdce 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -33,7 +33,6 @@ import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator} import org.apache.hudi.sql.InsertMode import org.apache.hudi.sync.common.HoodieSyncConfig - import org.apache.spark.internal.Logging import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable @@ -50,8 +49,8 @@ import org.apache.spark.sql.types.StructType import org.slf4j.LoggerFactory import java.util.Locale - import scala.collection.JavaConverters._ +import scala.collection.mutable trait ProvidesHoodieConfig extends Logging { @@ -85,6 +84,27 @@ trait ProvidesHoodieConfig extends Logging { defaultOpts = defaultOpts, overridingOpts = overridingOpts) } + def buildBucketRescaleHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = { + val sparkSession: SparkSession = hoodieCatalogTable.spark + val tableConfig = hoodieCatalogTable.tableConfig + val preCombineField = Option(tableConfig.getPreCombineField).getOrElse("") + val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) + + val defaultOpts = Map[String, String]( + OPERATION.key -> BULK_INSERT_OPERATION_OPT_VAL, + HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.BUCKET_RESCALE.value(), + KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, + SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, + SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL, + HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), + HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> "false" + ) + + val overridingOpts = buildOverridingOpts(hoodieCatalogTable, preCombineField) + combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, + defaultOpts = defaultOpts, overridingOpts = overridingOpts) + } + /** * Deduce the sql write operation for INSERT_INTO */ diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala new file mode 100644 index 00000000000..4aaeec80a1a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration +import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieSparkSqlWriter} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.commit.DatasetBucketRescaleCommitActionExecutor +import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig, SerializableSchema} +import org.apache.hudi.common.engine.HoodieEngineContext +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.{FileSlice, PartitionBucketIndexHashingConfig, WriteOperationType} +import org.apache.hudi.common.table.read.HoodieFileGroupReader +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.common.util.{Option, StringUtils} +import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.index.bucket.{PartitionBucketIndexCalculator, PartitionBucketIndexUtils} +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.internal.schema.utils.SerDeHelper +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration +import org.apache.hudi.storage.{StoragePath, StoragePathInfo} +import org.apache.hudi.table.SparkBroadcastManager +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{HoodieUnsafeUtils, Row, SaveMode} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable +import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.hudi.ProvidesHoodieConfig +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.function.Supplier +import java.util.stream.Collectors +import scala.collection.JavaConverters._ +import scala.collection.mutable + +class PartitionBucketIndexManager() extends BaseProcedure + with ProcedureBuilder + with PredicateHelper + with ProvidesHoodieConfig + with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "overwrite", DataTypes.StringType), + ProcedureParameter.optional(2, "bucket-number", DataTypes.IntegerType, -1), + ProcedureParameter.optional(3, "add", DataTypes.StringType), + ProcedureParameter.optional(4, "dry-run", DataTypes.BooleanType, true), + ProcedureParameter.optional(5, "rollback", DataTypes.StringType), + ProcedureParameter.optional(6, "show-config", DataTypes.BooleanType, false), + ProcedureParameter.optional(7, "rule", DataTypes.StringType, "regex") + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("result", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("details", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val overwrite = getArgValueOrDefault(args, PARAMETERS(1)).orNull.asInstanceOf[String] + val bucketNumber = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int] + val add = getArgValueOrDefault(args, PARAMETERS(3)).orNull.asInstanceOf[String] + val dryRun = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean] + val rollback = getArgValueOrDefault(args, PARAMETERS(5)).orNull.asInstanceOf[String] + val showConfig = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[Boolean] + val rule = getArgValueOrDefault(args, PARAMETERS(7)).orNull.asInstanceOf[String] + + try { + // Get table metadata + val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table) + val basePath = hoodieCatalogTable.tableLocation + val metaClient = createMetaClient(jsc, basePath) + val writeClient = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, Map.empty, + scala.Option.apply(table)) + val context = writeClient.getEngineContext + + // Determine which operation to perform + if (showConfig) { + handleShowConfig(metaClient) + } else if (rollback != null) { + handleRollback(metaClient, rollback) + } else if (overwrite != null) { + handleOverwrite(hoodieCatalogTable, writeClient, context, metaClient, overwrite, bucketNumber, rule, dryRun) + } else if (add != null) { + handleAdd(metaClient, add, dryRun) + } else { + Seq(Row("ERROR", "INVALID_OPERATION", "No valid operation specified")) + } + } catch { + case e: Exception => Seq(Row("ERROR", "EXCEPTION", e.getMessage)) + } + } + + /** + * Handle the overwrite operation. + */ + private def handleOverwrite(catalog: HoodieCatalogTable, + writeClient: SparkRDDWriteClient[_], + context : HoodieEngineContext, + metaClient: HoodieTableMetaClient, + expression: String, + bucketNumber: Int, + rule: String, + dryRun: Boolean): Seq[Row] = { + val config = buildBucketRescaleHoodieConfig(catalog).asJava + val basePath = metaClient.getBasePath + val mdtEnable = metaClient.getStorage().exists(new StoragePath(metaClient.getBasePath, HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH)) + + // get all partition paths + val allPartitions = FSUtils.getAllPartitionPaths(context, metaClient.getStorage, metaClient.getBasePath, mdtEnable) + + // get Map<partitionPath, bucketNumber> based on latest hashing_config + val latestHashingConfigInstant = PartitionBucketIndexUtils.getHashingConfigInstantToLoad(metaClient) + val calcWithLatestInstant = PartitionBucketIndexCalculator.getInstance(latestHashingConfigInstant, metaClient) + val partition2BucketWithLatestHashingConfig = PartitionBucketIndexUtils.getAllBucketNumbers(calcWithLatestInstant, allPartitions) + + // get Map<partitionPath, bucketNumber> based on new given expression + val defaultBucketNumber = if (bucketNumber != -1) { + bucketNumber + } else { + // reuse latest default bucket number + calcWithLatestInstant.getHashingConfig.getDefaultBucketNumber + } + val instantTime = writeClient.createNewInstantTime() + config.put(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS.key(), expression) + config.put(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.key(), rule) + config.put(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT.key(), instantTime) + config.put(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key(), bucketNumber.toString) + + val newConfig = new PartitionBucketIndexHashingConfig(expression, defaultBucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, instantTime) + val calcWithNewExpression = PartitionBucketIndexCalculator.getInstance(instantTime, newConfig) + val partition2BucketWithNewHashingConfig = PartitionBucketIndexUtils.getAllBucketNumbers(calcWithNewExpression, allPartitions) + + // get partitions need to be rescaled + val rescalePartitionsMap = getDifferentPartitions(partition2BucketWithNewHashingConfig.asScala, partition2BucketWithLatestHashingConfig.asScala) + val partitionsToRescale = rescalePartitionsMap.keys + + // get all fileSlices need to read + val allFilesMap = FSUtils.getFilesInPartitions(context, metaClient.getStorage(), HoodieMetadataConfig.newBuilder.enable(mdtEnable).build, + metaClient.getBasePath.toString, partitionsToRescale.toArray) + val files = allFilesMap.values().asScala.flatMap(x => x.asScala).toList + val view = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline, files.asJava) + val allFileSlice = partitionsToRescale.flatMap(partitionPath => { + view.getLatestFileSlices(partitionPath).iterator().asScala + }).toList + + // read all fileSlice para and get DF + var tableSchemaWithMetaFields: Schema = null + try tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(new TableSchemaResolver(metaClient).getTableAvroSchema(false), false) + catch { + case e: Exception => + throw new HoodieException("Failed to get table schema during clustering", e) + } + + // broadcast reader context. + val broadcastManager = new SparkBroadcastManager(context, metaClient) + broadcastManager.prepareAndBroadcast() + val sparkSchemaWithMetaFields = AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields) + val serializableTableSchemaWithMetaFields = new SerializableSchema(tableSchemaWithMetaFields) + val latestInstantTime = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get() + + val res: RDD[InternalRow] = spark.sparkContext.parallelize(allFileSlice, allFileSlice.size).flatMap(fileSlice => { + // instantiate other supporting cast + val readerSchema = serializableTableSchemaWithMetaFields.get + val readerContextOpt = broadcastManager.retrieveFileGroupReaderContext(basePath) + val internalSchemaOption: Option[InternalSchema] = Option.empty() + // instantiate FG reader + val fileGroupReader = new HoodieFileGroupReader(readerContextOpt.get(), + metaClient.getStorage, + basePath.toString, + latestInstantTime.requestedTime(), + fileSlice, + readerSchema, + readerSchema, + internalSchemaOption, // not support evolution of schema for now + metaClient, + metaClient.getTableConfig.getProps, + 0, + java.lang.Long.MAX_VALUE, + HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.defaultValue()) + fileGroupReader.initRecordIterators() + val iterator = fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]] + iterator.asScala + }) + val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, res, sparkSchemaWithMetaFields) + + val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write( + sparkSession.sqlContext, + SaveMode.Overwrite, + config.asScala.toMap, + dataFrame) + + val details = s"Expression: $expression, Bucket Number: $bucketNumber, Dry Run: $dryRun" + Seq(Row("SUCCESS", "OVERWRITE", success)) + } + + /** + * Compares two maps of partition paths to bucket numbers and returns a map of partition paths + * that have different bucket numbers in the two maps, using the new bucket numbers. + * + * @param newMap The new map of partition paths to bucket numbers + * @param oldMap The old map of partition paths to bucket numbers + * @return A map containing only the partition paths that have different bucket numbers, with values from newMap + */ + def getDifferentPartitions(newMap: mutable.Map[String, Integer], oldMap: mutable.Map[String, Integer]): mutable.Map[String, Integer] = { + newMap.filter { + case (partitionPath, bucketNumber) => + // Include partition path if it's not in old map or has a different bucket number + !oldMap.contains(partitionPath) || oldMap(partitionPath) != bucketNumber + } + } + + /** + * Handle the add operation. + */ + private def handleAdd(metaClient: HoodieTableMetaClient, expression: String, dryRun: Boolean): Seq[Row] = { + // In a real implementation, this would call PartitionBucketIndexManager + // For now, just return a placeholder result + val details = s"Expression: $expression, Dry Run: $dryRun" + + // Here would be the actual call to PartitionBucketIndexManager + // PartitionBucketIndexManager.addExpression(metaClient, expression, dryRun) + + Seq(Row("SUCCESS", "ADD", details)) + } + + /** + * Handle the rollback operation. + */ + private def handleRollback(metaClient: HoodieTableMetaClient, instantTime: String): Seq[Row] = { + // In a real implementation, this would call PartitionBucketIndexManager + // For now, just return a placeholder result + val details = s"Rolled back bucket rescale action: $instantTime" + + // Here would be the actual call to PartitionBucketIndexManager + // PartitionBucketIndexManager.rollback(metaClient, instantTime) + + Seq(Row("SUCCESS", "ROLLBACK", details)) + } + + /** + * Handle the show-config operation. + */ + private def handleShowConfig(metaClient: HoodieTableMetaClient): Seq[Row] = { + + + + Seq(Row("SUCCESS", "SHOW_CONFIG", null)) + } + + override def build: Procedure = new PartitionBucketIndexManager() +} + +object PartitionBucketIndexManager { + val NAME = "PartitionBucketIndexManager" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new PartitionBucketIndexManager() + } +} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala index e0e60e3f6b4..5fd8fd4ad19 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala @@ -74,6 +74,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { TimeZone.setDefault(DateTimeUtils.getTimeZone("UTC")) protected lazy val spark: SparkSession = SparkSession.builder() .config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath) + .config("spark.driver.bindAddress", "127.0.0.1") .config("spark.sql.session.timeZone", "UTC") .config("hoodie.insert.shuffle.parallelism", "4") .config("hoodie.upsert.shuffle.parallelism", "4") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala index 7c1bc0aa2a2..532a000f0f6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala @@ -1847,6 +1847,94 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } + test("Test Bulk Insert Into Partition Bucket Index Table") { + withSQLConf( + "hoodie.datasource.write.operation" -> "bulk_insert", + "hoodie.bulkinsert.shuffle.parallelism" -> "1") { + Seq("mor", "cow").foreach { tableType => + Seq("true", "false").foreach { bulkInsertAsRow => + withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id,name', + | type = '$tableType', + | preCombineField = 'ts', + | hoodie.index.type = 'BUCKET', + | hoodie.bucket.index.hash.field = 'id,name', + | hoodie.datasource.write.row.writer.enable = '$bulkInsertAsRow') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + + // Note: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1,1', 10, 1000, "2021-01-05"), + | (2, 'a2', 20, 2000, "2021-01-06"), + | (3, 'a3,3', 30, 3000, "2021-01-07") + """.stripMargin) + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3,3", 30.0, 3000, "2021-01-07") + ) + + // for COW with disabled Spark native row writer, multiple bulk inserts are restricted + if (tableType != "cow" && bulkInsertAsRow != "false") { + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1', 10, 1000, "2021-01-05"), + | (3, "a3", 30, 3000, "2021-01-07") + """.stripMargin) + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(1, "a1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3,3", 30.0, 3000, "2021-01-07"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + + // there are two files in partition(dt = '2021-01-05') + checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")( + Seq(2) + ) + + // would generate 6 other files in partition(dt = '2021-01-05') + spark.sql( + s""" + | insert into $tableName values + | (4, 'a1,1', 10, 1000, "2021-01-05"), + | (5, 'a1,1', 10, 1000, "2021-01-05"), + | (6, 'a1,1', 10, 1000, "2021-01-05"), + | (7, 'a1,1', 10, 1000, "2021-01-05"), + | (8, 'a1,1', 10, 1000, "2021-01-05"), + | (10, 'a3,3', 30, 3000, "2021-01-05") + """.stripMargin) + + checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")( + Seq(8) + ) + } + } + } + } + } + } + test("Test not supported multiple BULK INSERTs into COW with SIMPLE BUCKET and disabled Spark native row writer") { withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert", "hoodie.bulkinsert.shuffle.parallelism" -> "1") { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala new file mode 100644 index 00000000000..93d2a557afe --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala @@ -0,0 +1,31 @@ +package org.apache.spark.sql.hudi.procedure + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.index.bucket.PartitionBucketIndexUtils + +class TestPartitionBucketIndexManager extends HoodieSparkProcedureTestBase { + + test("Case1: Test Call drop_partition Procedure For Multiple Partitions: '*' stands for all partitions in leaf partition") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName + val metaClient = getTableMetaClient(tablePath) + val expressions = "\\d{4}-(06-(01|17|18)|11-(01|10|11)),256" + val rule ="regex" + val defaultBucketNumber = 10 + PartitionBucketIndexUtils.initHashingConfig(metaClient, expressions, rule, defaultBucketNumber, null) + + spark.sql(s"""call PartitionBucketIndexManager(table => '$tableName', show-config => 'true')""").collect() + + } + } + + private def getTableMetaClient(tablePath: String): HoodieTableMetaClient = { + HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(HadoopFSUtils.getStorageConf(spark.sparkContext.hadoopConfiguration)) + .build() + } + +}
