This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 42c12289287 [HUDI-9379] Spark RemotePartitioner For BucketIndex 
(#13265)
42c12289287 is described below

commit 42c1228928794b3c689849d495fe5d21abe1a45f
Author: YueZhang <[email protected]>
AuthorDate: Mon May 19 11:44:11 2025 +0800

    [HUDI-9379] Spark RemotePartitioner For BucketIndex (#13265)
---
 .../client/embedded/EmbeddedTimelineService.java   |  4 +
 .../org/apache/hudi/config/HoodieIndexConfig.java  | 11 +++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 +
 .../BucketIndexBulkInsertPartitionerWithRows.java  | 21 +++++-
 .../apache/spark/sql/BucketPartitionUtils.scala    | 34 ++++++---
 .../hudi/common/util/RemotePartitionHelper.java    | 88 ++++++++++++++++++++++
 .../DatasetBucketRescaleCommitActionExecutor.java  |  2 +-
 .../hudi/functional/TestBucketIndexSupport.scala   | 56 +++++++++++++-
 .../spark/sql/hudi/dml/TestInsertTable.scala       | 45 +++++++++++
 .../hudi/timeline/service/RequestHandler.java      | 19 +++++
 .../hudi/timeline/service/TimelineService.java     | 10 +++
 .../service/handlers/RemotePartitionerHandler.java | 59 +++++++++++++++
 12 files changed, 338 insertions(+), 15 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index e9c7f5ca8b9..d92334756df 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -170,6 +170,10 @@ public class EmbeddedTimelineService {
                   * writeConfig.getHoodieClientHeartbeatTolerableMisses());
     }
 
+    if (writeConfig.isUsingRemotePartitioner()) {
+      timelineServiceConfBuilder.enableRemotePartitioner(true);
+    }
+
     this.serviceConfig = timelineServiceConfBuilder.build();
 
     server = timelineServiceCreator.create(context, storageConf.newInstance(), 
serviceConfig, viewManager);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 9aac9cf3534..1932e63cd31 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -301,6 +301,12 @@ public class HoodieIndexConfig extends HoodieConfig {
       .withDocumentation("Only applies if index type is BUCKET. Determine the 
number of buckets in the hudi table, "
           + "and each partition is divided to N buckets.");
 
+  public static final ConfigProperty<Boolean> BUCKET_PARTITIONER = 
ConfigProperty
+      .key("hoodie.bucket.index.remote.partitioner.enable")
+      .defaultValue(false)
+      .withDocumentation("Use Remote Partitioner using centralized allocation 
of partition "
+          + "IDs to do repartition based on bucket aiming to resolve data 
skew. Default local hash partitioner");
+
   public static final ConfigProperty<String> BUCKET_INDEX_PARTITION_RULE_TYPE 
= ConfigProperty
       .key("hoodie.bucket.index.partition.rule.type")
       .defaultValue(PartitionBucketIndexRule.REGEX.name)
@@ -728,6 +734,11 @@ public class HoodieIndexConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder enableBucketRemotePartitioner(boolean 
enableRemotePartitioner) {
+      hoodieIndexConfig.setValue(BUCKET_PARTITIONER, 
String.valueOf(enableRemotePartitioner));
+      return this;
+    }
+
     public Builder withBucketMinNum(int bucketMinNum) {
       hoodieIndexConfig.setValue(BUCKET_INDEX_MIN_NUM_BUCKETS, 
String.valueOf(bucketMinNum));
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index f69a8fd25ba..922ba350284 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2265,6 +2265,10 @@ public class HoodieWriteConfig extends HoodieConfig {
   public String getRecordIndexInputStorageLevel() {
     return 
getStringOrDefault(HoodieIndexConfig.RECORD_INDEX_INPUT_STORAGE_LEVEL_VALUE);
   }
+  
+  public boolean isUsingRemotePartitioner() {
+    return getBoolean(HoodieIndexConfig.BUCKET_PARTITIONER);
+  }
 
   /**
    * storage properties.
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
index db7a64de69f..9816febb041 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
@@ -19,10 +19,12 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
+import org.apache.spark.Partitioner;
 import org.apache.spark.sql.BucketPartitionUtils$;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -34,21 +36,34 @@ public class BucketIndexBulkInsertPartitionerWithRows 
implements BulkInsertParti
 
   private final String indexKeyFields;
   private final NumBucketsFunction numBucketsFunction;
+  private final HoodieWriteConfig writeConfig;
+  private FileSystemViewStorageConfig viewConfig;
 
   public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, 
HoodieWriteConfig writeConfig) {
     this.indexKeyFields = indexKeyFields;
     this.numBucketsFunction = NumBucketsFunction.fromWriteConfig(writeConfig);
+    this.writeConfig = writeConfig;
+    if (writeConfig.isUsingRemotePartitioner()) {
+      this.viewConfig = writeConfig.getViewStorageConfig();
+    }
   }
 
-  public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, 
PartitionBucketIndexHashingConfig hashingConfig) {
-    this.indexKeyFields = indexKeyFields;
+  public BucketIndexBulkInsertPartitionerWithRows(HoodieWriteConfig 
writeConfig, PartitionBucketIndexHashingConfig hashingConfig) {
+    this.indexKeyFields = writeConfig.getBucketIndexHashFieldWithDefault();
     this.numBucketsFunction = new 
NumBucketsFunction(hashingConfig.getExpressions(),
         hashingConfig.getRule(), hashingConfig.getDefaultBucketNumber());
+    this.writeConfig = writeConfig;
+    if (writeConfig.isUsingRemotePartitioner()) {
+      this.viewConfig = writeConfig.getViewStorageConfig();
+    }
   }
 
   @Override
   public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputPartitions) {
-    return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields, 
numBucketsFunction, outputPartitions);
+    Partitioner partitioner = writeConfig.isUsingRemotePartitioner() && 
writeConfig.isEmbeddedTimelineServerEnabled()
+        ? BucketPartitionUtils$.MODULE$.getRemotePartitioner(viewConfig, 
numBucketsFunction, outputPartitions) 
+        : 
BucketPartitionUtils$.MODULE$.getLocalePartitioner(numBucketsFunction, 
outputPartitions);
+    return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields, 
numBucketsFunction, partitioner);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
index ecdf406871f..9e3784cf0ca 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
@@ -19,7 +19,8 @@
 package org.apache.spark.sql
 
 import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.util.Functions
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
+import org.apache.hudi.common.util.{Functions, RemotePartitionHelper}
 import org.apache.hudi.common.util.hash.BucketIndexUtil
 import org.apache.hudi.index.bucket.BucketIdentifier
 import org.apache.hudi.index.bucket.partition.NumBucketsFunction
@@ -28,7 +29,7 @@ import org.apache.spark.Partitioner
 import org.apache.spark.sql.catalyst.InternalRow
 
 object BucketPartitionUtils {
-  def createDataFrame(df: DataFrame, indexKeyFields: String, 
numBucketsFunction: NumBucketsFunction, partitionNum: Int): DataFrame = {
+  def createDataFrame(df: DataFrame, indexKeyFields: String, 
numBucketsFunction: NumBucketsFunction, partitioner: Partitioner): DataFrame = {
     def getPartitionKeyExtractor(): InternalRow => (String, Int) = row => {
       val partition = row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD)
       val kb = BucketIdentifier
@@ -42,8 +43,29 @@ object BucketPartitionUtils {
     }
 
     val getPartitionKey = getPartitionKeyExtractor()
-    val partitioner = new Partitioner {
+    // use internalRow to avoid extra convert.
+    val reRdd = df.queryExecution.toRdd
+      .keyBy(row => getPartitionKey(row))
+      .repartitionAndSortWithinPartitions(partitioner)
+      .values
+    df.sparkSession.internalCreateDataFrame(reRdd, df.schema)
+  }
+
+  def getRemotePartitioner(viewConf: FileSystemViewStorageConfig, 
numBucketsFunction: NumBucketsFunction, partitionNum: Int): Partitioner = {
+    new Partitioner {
+      private val helper = new RemotePartitionHelper(viewConf)
+
+      override def numPartitions: Int = partitionNum
+
+      override def getPartition(value: Any): Int = {
+        val partitionKeyPair = value.asInstanceOf[(String, Int)]
+        
helper.getPartition(numBucketsFunction.getNumBuckets(partitionKeyPair._1), 
partitionKeyPair._1, partitionKeyPair._2, partitionNum)
+      }
+    }
+  }
 
+  def getLocalePartitioner(numBucketsFunction: NumBucketsFunction, 
partitionNum: Int): Partitioner = {
+    new Partitioner {
       private val partitionIndexFunc: Functions.Function3[Integer, String, 
Integer, Integer] =
         BucketIndexUtil.getPartitionIndexFunc(partitionNum)
 
@@ -54,11 +76,5 @@ object BucketPartitionUtils {
         
partitionIndexFunc.apply(numBucketsFunction.getNumBuckets(partitionKeyPair._1), 
partitionKeyPair._1, partitionKeyPair._2)
       }
     }
-    // use internalRow to avoid extra convert.
-    val reRdd = df.queryExecution.toRdd
-      .keyBy(row => getPartitionKey(row))
-      .repartitionAndSortWithinPartitions(partitioner)
-      .values
-    df.sparkSession.internalCreateDataFrame(reRdd, df.schema)
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/RemotePartitionHelper.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/RemotePartitionHelper.java
new file mode 100644
index 00000000000..626f443fd22
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/RemotePartitionHelper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.Consts;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class RemotePartitionHelper implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemotePartitionHelper.class);
+
+  public static final String URL = "/v1/hoodie/partitioner/getpartitionindex";
+  public static final String NUM_BUCKETS_PARAM = "numbuckets";
+  public static final String PARTITION_PATH_PARAM = "partitionpath";
+  public static final String PARTITION_NUM_PARAM = "partitionnum";
+  private final RetryHelper retryHelper;
+  private final String serverHost;
+  private final Integer serverPort;
+  private final ObjectMapper mapper;
+  private final int timeoutMs;
+  private final HashMap<String, Integer> cache; // dataPartition -> 
sparkPartitionIndex
+  public RemotePartitionHelper(FileSystemViewStorageConfig viewConf) {
+    this.retryHelper = new RetryHelper(
+        viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
+        viewConf.getRemoteTimelineClientMaxRetryNumbers(),
+        viewConf.getRemoteTimelineInitialRetryIntervalMs(),
+        viewConf.getRemoteTimelineClientRetryExceptions(),
+        "Sending request");
+    this.serverHost = viewConf.getRemoteViewServerHost();
+    this.serverPort = viewConf.getRemoteViewServerPort();
+    this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000;
+    this.mapper = new ObjectMapper();
+    this.cache = new HashMap<>();
+  }
+
+  public int getPartition(int numBuckets, String partitionPath, int curBucket, 
int partitionNum) throws Exception {
+    if (cache.containsKey(partitionPath)) {
+      return computeActualPartition(cache.get(partitionPath), curBucket, 
partitionNum);
+    }
+    URIBuilder builder =
+        new 
URIBuilder().setHost(serverHost).setPort(serverPort).setPath(URL).setScheme("http");
+
+    builder.addParameter(NUM_BUCKETS_PARAM, String.valueOf(numBuckets));
+    builder.addParameter(PARTITION_PATH_PARAM, partitionPath);
+    builder.addParameter(PARTITION_NUM_PARAM, String.valueOf(partitionNum));
+
+    String url = builder.toString();
+    LOG.debug("Sending request : (" + url + ").");
+    Response response = (Response)(retryHelper != null ? retryHelper.start(() 
-> 
Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute())
+        : 
Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute());
+    String content = response.returnContent().asString(Consts.UTF_8);
+    int partitionIndex = Integer.parseInt(mapper.readValue(content, new 
TypeReference<String>() {}));
+    cache.put(partitionPath, partitionIndex);
+    return computeActualPartition(partitionIndex, curBucket, partitionNum);
+  }
+
+  private int computeActualPartition(int startOffset, int curBucket, int 
partitionNum) {
+    int res = startOffset + curBucket;
+    return res >= partitionNum ? res % partitionNum : res;
+  }
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
index ab623643a62..449283d872c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
@@ -59,7 +59,7 @@ public class DatasetBucketRescaleCommitActionExecutor extends 
DatasetBulkInsertO
    */
   @Override
   protected BulkInsertPartitioner<Dataset<Row>> getPartitioner(boolean 
populateMetaFields, boolean isTablePartitioned) {
-    return new 
BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault(),
 hashingConfig);
+    return new 
BucketIndexBulkInsertPartitionerWithRows(writeClient.getConfig(), 
hashingConfig);
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
index 67915f9a588..adf34641989 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
@@ -22,21 +22,27 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, 
TypedProperties}
 import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.config.HoodieIndexConfig
+import org.apache.hudi.common.table.view.{FileSystemViewManager, 
FileSystemViewStorageConfig}
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.index.HoodieIndex
 import org.apache.hudi.index.bucket.BucketIdentifier
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction
 import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator}
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.timeline.service.TimelineService
 
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericData
-import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession}
+import org.apache.spark.sql.{BucketPartitionUtils, 
HoodieCatalystExpressionUtils, SparkSession}
 import org.apache.spark.sql.catalyst.encoders.DummyExpressionHolder
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.types._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
 
+import java.util
+
 @Tag("functional")
 class TestBucketIndexSupport extends HoodieSparkClientTestBase with 
PredicateHelper {
 
@@ -78,6 +84,52 @@ class TestBucketIndexSupport extends 
HoodieSparkClientTestBase with PredicateHel
     cleanupSparkContexts()
   }
 
+  @Test
+  def testBucketIndexRemotePartitioner(): Unit = {
+    val numBuckets = 511
+    val config = HoodieWriteConfig.newBuilder
+      .withPath(basePath).
+      withIndexConfig(HoodieIndexConfig.newBuilder()
+        .withBucketNum(numBuckets.toString)
+        .enableBucketRemotePartitioner(true).build())
+      .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder
+        .withRemoteServerPort(incrementTimelineServicePortToUse).build)
+      .build
+
+    val timelineService = new TimelineService(context, 
HadoopFSUtils.getStorageConf,
+      
TimelineService.Config.builder.enableMarkerRequests(true).enableRemotePartitioner(true)
+      .serverPort(config.getViewStorageConfig.getRemoteViewServerPort).build,
+      FileSystemViewManager.createViewManager(context, 
config.getMetadataConfig, config.getViewStorageConfig, config.getCommonConfig))
+
+    timelineService.startService
+    this.timelineService = timelineService
+
+    timelineService.startService()
+    val numBucketsFunction = NumBucketsFunction.fromWriteConfig(config)
+    val partitionNum = 1533
+    val partitioner = 
BucketPartitionUtils.getRemotePartitioner(config.getViewStorageConfig, 
numBucketsFunction, partitionNum)
+    val dataPartitions = List("dt=20250501", "dt=20250502", "dt=20250503", 
"dt=20250504", "dt=20250505", "dt=20250506")
+    val res = new util.HashMap[Int, Int]
+    dataPartitions.foreach(dataPartition => {
+      for (i <- 1 to numBuckets) {
+        // for bucket id from 1 to numBuckets
+        // mock 1000 spark partitions
+        val sparkPartition = partitioner.getPartition((dataPartition, i))
+        if (res.containsKey(sparkPartition)) {
+          val value = res.get(sparkPartition) + 1
+          res.put(sparkPartition, value)
+        } else {
+          res.put(sparkPartition, 1)
+        }
+      }
+    })
+    timelineService.close()
+
+    res.values().stream().forEach(value => {
+      assert(value == (numBuckets*dataPartitions.size/partitionNum))
+    })
+  }
+
   @Test
   def testSingleHashFieldsExpression: Unit = {
     val bucketNumber = 19
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 474c7f3c018..fd0e7b328c1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -1890,6 +1890,51 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Bulk Insert Into Bucket Index Table With Remote Partitioner") {
+    withSQLConf(
+      "hoodie.datasource.write.operation" -> "bulk_insert",
+      "hoodie.bulkinsert.shuffle.parallelism" -> "1") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        // Create a partitioned table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  dt string,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (
+             | primaryKey = 'id,name',
+             | type = 'cow',
+             | preCombineField = 'ts',
+             | hoodie.index.type = 'BUCKET',
+             | hoodie.bucket.index.hash.field = 'id,name',
+             | hoodie.bucket.index.remote.partitioner.enable = 'true')
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}'
+       """.stripMargin)
+
+        // Note: Do not write the field alias, the partition field must be 
placed last.
+        spark.sql(
+          s"""
+             | insert into $tableName values
+             | (1, 'a1,1', 10, 1000, "2021-01-05"),
+             | (2, 'a2', 20, 2000, "2021-01-06"),
+             | (3, 'a3,3', 30, 3000, "2021-01-07")
+       """.stripMargin)
+
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+          Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
+        )
+      }
+    }
+  }
+
   test("Test Insert Overwrite Bucket Index Table") {
     withSQLConf(
       "hoodie.datasource.write.operation" -> "bulk_insert",
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 701e92e3902..8050c62057f 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -35,12 +35,14 @@ import 
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.RemotePartitionHelper;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
 import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
 import org.apache.hudi.timeline.service.handlers.MarkerHandler;
+import org.apache.hudi.timeline.service.handlers.RemotePartitionerHandler;
 import org.apache.hudi.timeline.service.handlers.TimelineHandler;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -83,6 +85,7 @@ public class RequestHandler {
   private final FileSliceHandler sliceHandler;
   private final BaseFileHandler dataFileHandler;
   private final MarkerHandler markerHandler;
+  private RemotePartitionerHandler partitionerHandler;
   private final Registry metricsRegistry = 
Registry.getRegistry("TimelineService");
   private final ScheduledExecutorService asyncResultService;
 
@@ -101,6 +104,9 @@ public class RequestHandler {
     } else {
       this.markerHandler = null;
     }
+    if (timelineServiceConfig.enableRemotePartitioner) {
+      this.partitionerHandler = new RemotePartitionerHandler(conf, 
timelineServiceConfig, viewManager);
+    }
     if (timelineServiceConfig.async) {
       this.asyncResultService = Executors.newSingleThreadScheduledExecutor();
     } else {
@@ -182,6 +188,9 @@ public class RequestHandler {
     if (markerHandler != null) {
       registerMarkerAPI();
     }
+    if (partitionerHandler != null) {
+      registerRemotePartitionerAPI();
+    }
   }
 
   public void stop() {
@@ -530,6 +539,16 @@ public class RequestHandler {
     }, false));
   }
 
+  private void registerRemotePartitionerAPI() {
+    app.get(RemotePartitionHelper.URL, new ViewHandler(ctx -> {
+      int partition = partitionerHandler.gePartitionIndex(
+          ctx.queryParamAsClass(RemotePartitionHelper.NUM_BUCKETS_PARAM, 
String.class).getOrDefault(""),
+          ctx.queryParamAsClass(RemotePartitionHelper.PARTITION_PATH_PARAM, 
String.class).getOrDefault(""),
+          ctx.queryParamAsClass(RemotePartitionHelper.PARTITION_NUM_PARAM, 
String.class).getOrDefault(""));
+      writeValueAsString(ctx, partition);
+    }, false));
+  }
+
   /**
    * Used for logging and performing refresh check.
    */
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
index 7dfe55b3e93..a5f3780d253 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
@@ -111,6 +111,9 @@ public class TimelineService {
     @Parameter(names = {"--enable-marker-requests", "-em"}, description = 
"Enable handling of marker-related requests")
     public boolean enableMarkerRequests = false;
 
+    @Parameter(names = {"--enable-remote-partitioner"}, description = "Enable 
remote partitioner")
+    public boolean enableRemotePartitioner = false;
+
     @Parameter(names = {"--marker-batch-threads", "-mbt"}, description = 
"Number of threads to use for batch processing marker creation requests")
     public int markerBatchNumThreads = 20;
 
@@ -185,6 +188,7 @@ public class TimelineService {
       private Long asyncConflictDetectorInitialDelayMs = 0L;
       private Long asyncConflictDetectorPeriodMs = 30000L;
       private Long maxAllowableHeartbeatIntervalInMs = 120000L;
+      private boolean enableRemotePartitioner = false;
 
       public Builder() {
       }
@@ -239,6 +243,11 @@ public class TimelineService {
         return this;
       }
 
+      public Builder enableRemotePartitioner(boolean enableRemotePartitioner) {
+        this.enableRemotePartitioner = enableRemotePartitioner;
+        return this;
+      }
+
       public Builder markerBatchNumThreads(int markerBatchNumThreads) {
         this.markerBatchNumThreads = markerBatchNumThreads;
         return this;
@@ -296,6 +305,7 @@ public class TimelineService {
         config.async = this.async;
         config.compress = this.compress;
         config.enableMarkerRequests = this.enableMarkerRequests;
+        config.enableRemotePartitioner = this.enableRemotePartitioner;
         config.markerBatchNumThreads = this.markerBatchNumThreads;
         config.markerBatchIntervalMs = this.markerBatchIntervalMs;
         config.markerParallelism = this.markerParallelism;
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/RemotePartitionerHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/RemotePartitionerHandler.java
new file mode 100644
index 00000000000..74d930b8d80
--- /dev/null
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/RemotePartitionerHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RemotePartitionerHandler extends Handler {
+
+  // cache Map<PartitionPath, BucketStartIndex>
+  private final ConcurrentHashMap<String, Integer> cache;
+  private final AtomicInteger nextIndex = new AtomicInteger(0);
+
+  public RemotePartitionerHandler(StorageConfiguration<?> conf, 
TimelineService.Config timelineServiceConfig,
+                                  FileSystemViewManager viewManager) {
+    super(conf, timelineServiceConfig, viewManager);
+    this.cache = new ConcurrentHashMap<>();
+  }
+
+  public int gePartitionIndex(String numBuckets, String partitionPath, String 
partitionNum) {
+    int num = Integer.parseInt(numBuckets);
+    int partNum = Integer.parseInt(partitionNum);
+
+    return cache.computeIfAbsent(partitionPath, key -> {
+      int current;
+      int newNext;
+      int res;
+      do {
+        current = nextIndex.get();
+        res = current;
+        newNext = current + num;
+        if (newNext >= partNum) {
+          newNext = newNext % partNum;
+        }
+      } while (!nextIndex.compareAndSet(current, newNext));
+      return res;
+    });
+  }
+}

Reply via email to