This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 42c12289287 [HUDI-9379] Spark RemotePartitioner For BucketIndex
(#13265)
42c12289287 is described below
commit 42c1228928794b3c689849d495fe5d21abe1a45f
Author: YueZhang <[email protected]>
AuthorDate: Mon May 19 11:44:11 2025 +0800
[HUDI-9379] Spark RemotePartitioner For BucketIndex (#13265)
---
.../client/embedded/EmbeddedTimelineService.java | 4 +
.../org/apache/hudi/config/HoodieIndexConfig.java | 11 +++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +
.../BucketIndexBulkInsertPartitionerWithRows.java | 21 +++++-
.../apache/spark/sql/BucketPartitionUtils.scala | 34 ++++++---
.../hudi/common/util/RemotePartitionHelper.java | 88 ++++++++++++++++++++++
.../DatasetBucketRescaleCommitActionExecutor.java | 2 +-
.../hudi/functional/TestBucketIndexSupport.scala | 56 +++++++++++++-
.../spark/sql/hudi/dml/TestInsertTable.scala | 45 +++++++++++
.../hudi/timeline/service/RequestHandler.java | 19 +++++
.../hudi/timeline/service/TimelineService.java | 10 +++
.../service/handlers/RemotePartitionerHandler.java | 59 +++++++++++++++
12 files changed, 338 insertions(+), 15 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index e9c7f5ca8b9..d92334756df 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -170,6 +170,10 @@ public class EmbeddedTimelineService {
* writeConfig.getHoodieClientHeartbeatTolerableMisses());
}
+ if (writeConfig.isUsingRemotePartitioner()) {
+ timelineServiceConfBuilder.enableRemotePartitioner(true);
+ }
+
this.serviceConfig = timelineServiceConfBuilder.build();
server = timelineServiceCreator.create(context, storageConf.newInstance(),
serviceConfig, viewManager);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 9aac9cf3534..1932e63cd31 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -301,6 +301,12 @@ public class HoodieIndexConfig extends HoodieConfig {
.withDocumentation("Only applies if index type is BUCKET. Determine the
number of buckets in the hudi table, "
+ "and each partition is divided to N buckets.");
+ public static final ConfigProperty<Boolean> BUCKET_PARTITIONER =
ConfigProperty
+ .key("hoodie.bucket.index.remote.partitioner.enable")
+ .defaultValue(false)
+ .withDocumentation("Use Remote Partitioner using centralized allocation
of partition "
+ + "IDs to do repartition based on bucket aiming to resolve data
skew. Default local hash partitioner");
+
public static final ConfigProperty<String> BUCKET_INDEX_PARTITION_RULE_TYPE
= ConfigProperty
.key("hoodie.bucket.index.partition.rule.type")
.defaultValue(PartitionBucketIndexRule.REGEX.name)
@@ -728,6 +734,11 @@ public class HoodieIndexConfig extends HoodieConfig {
return this;
}
+ public Builder enableBucketRemotePartitioner(boolean
enableRemotePartitioner) {
+ hoodieIndexConfig.setValue(BUCKET_PARTITIONER,
String.valueOf(enableRemotePartitioner));
+ return this;
+ }
+
public Builder withBucketMinNum(int bucketMinNum) {
hoodieIndexConfig.setValue(BUCKET_INDEX_MIN_NUM_BUCKETS,
String.valueOf(bucketMinNum));
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index f69a8fd25ba..922ba350284 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2265,6 +2265,10 @@ public class HoodieWriteConfig extends HoodieConfig {
public String getRecordIndexInputStorageLevel() {
return
getStringOrDefault(HoodieIndexConfig.RECORD_INDEX_INPUT_STORAGE_LEVEL_VALUE);
}
+
+ public boolean isUsingRemotePartitioner() {
+ return getBoolean(HoodieIndexConfig.BUCKET_PARTITIONER);
+ }
/**
* storage properties.
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
index db7a64de69f..9816febb041 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
@@ -19,10 +19,12 @@
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.Partitioner;
import org.apache.spark.sql.BucketPartitionUtils$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -34,21 +36,34 @@ public class BucketIndexBulkInsertPartitionerWithRows
implements BulkInsertParti
private final String indexKeyFields;
private final NumBucketsFunction numBucketsFunction;
+ private final HoodieWriteConfig writeConfig;
+ private FileSystemViewStorageConfig viewConfig;
public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields,
HoodieWriteConfig writeConfig) {
this.indexKeyFields = indexKeyFields;
this.numBucketsFunction = NumBucketsFunction.fromWriteConfig(writeConfig);
+ this.writeConfig = writeConfig;
+ if (writeConfig.isUsingRemotePartitioner()) {
+ this.viewConfig = writeConfig.getViewStorageConfig();
+ }
}
- public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields,
PartitionBucketIndexHashingConfig hashingConfig) {
- this.indexKeyFields = indexKeyFields;
+ public BucketIndexBulkInsertPartitionerWithRows(HoodieWriteConfig
writeConfig, PartitionBucketIndexHashingConfig hashingConfig) {
+ this.indexKeyFields = writeConfig.getBucketIndexHashFieldWithDefault();
this.numBucketsFunction = new
NumBucketsFunction(hashingConfig.getExpressions(),
hashingConfig.getRule(), hashingConfig.getDefaultBucketNumber());
+ this.writeConfig = writeConfig;
+ if (writeConfig.isUsingRemotePartitioner()) {
+ this.viewConfig = writeConfig.getViewStorageConfig();
+ }
}
@Override
public Dataset<Row> repartitionRecords(Dataset<Row> rows, int
outputPartitions) {
- return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields,
numBucketsFunction, outputPartitions);
+ Partitioner partitioner = writeConfig.isUsingRemotePartitioner() &&
writeConfig.isEmbeddedTimelineServerEnabled()
+ ? BucketPartitionUtils$.MODULE$.getRemotePartitioner(viewConfig,
numBucketsFunction, outputPartitions)
+ :
BucketPartitionUtils$.MODULE$.getLocalePartitioner(numBucketsFunction,
outputPartitions);
+ return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields,
numBucketsFunction, partitioner);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
index ecdf406871f..9e3784cf0ca 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
@@ -19,7 +19,8 @@
package org.apache.spark.sql
import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.util.Functions
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
+import org.apache.hudi.common.util.{Functions, RemotePartitionHelper}
import org.apache.hudi.common.util.hash.BucketIndexUtil
import org.apache.hudi.index.bucket.BucketIdentifier
import org.apache.hudi.index.bucket.partition.NumBucketsFunction
@@ -28,7 +29,7 @@ import org.apache.spark.Partitioner
import org.apache.spark.sql.catalyst.InternalRow
object BucketPartitionUtils {
- def createDataFrame(df: DataFrame, indexKeyFields: String,
numBucketsFunction: NumBucketsFunction, partitionNum: Int): DataFrame = {
+ def createDataFrame(df: DataFrame, indexKeyFields: String,
numBucketsFunction: NumBucketsFunction, partitioner: Partitioner): DataFrame = {
def getPartitionKeyExtractor(): InternalRow => (String, Int) = row => {
val partition = row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD)
val kb = BucketIdentifier
@@ -42,8 +43,29 @@ object BucketPartitionUtils {
}
val getPartitionKey = getPartitionKeyExtractor()
- val partitioner = new Partitioner {
+ // use internalRow to avoid extra convert.
+ val reRdd = df.queryExecution.toRdd
+ .keyBy(row => getPartitionKey(row))
+ .repartitionAndSortWithinPartitions(partitioner)
+ .values
+ df.sparkSession.internalCreateDataFrame(reRdd, df.schema)
+ }
+
+ def getRemotePartitioner(viewConf: FileSystemViewStorageConfig,
numBucketsFunction: NumBucketsFunction, partitionNum: Int): Partitioner = {
+ new Partitioner {
+ private val helper = new RemotePartitionHelper(viewConf)
+
+ override def numPartitions: Int = partitionNum
+
+ override def getPartition(value: Any): Int = {
+ val partitionKeyPair = value.asInstanceOf[(String, Int)]
+
helper.getPartition(numBucketsFunction.getNumBuckets(partitionKeyPair._1),
partitionKeyPair._1, partitionKeyPair._2, partitionNum)
+ }
+ }
+ }
+ def getLocalePartitioner(numBucketsFunction: NumBucketsFunction,
partitionNum: Int): Partitioner = {
+ new Partitioner {
private val partitionIndexFunc: Functions.Function3[Integer, String,
Integer, Integer] =
BucketIndexUtil.getPartitionIndexFunc(partitionNum)
@@ -54,11 +76,5 @@ object BucketPartitionUtils {
partitionIndexFunc.apply(numBucketsFunction.getNumBuckets(partitionKeyPair._1),
partitionKeyPair._1, partitionKeyPair._2)
}
}
- // use internalRow to avoid extra convert.
- val reRdd = df.queryExecution.toRdd
- .keyBy(row => getPartitionKey(row))
- .repartitionAndSortWithinPartitions(partitioner)
- .values
- df.sparkSession.internalCreateDataFrame(reRdd, df.schema)
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/RemotePartitionHelper.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/RemotePartitionHelper.java
new file mode 100644
index 00000000000..626f443fd22
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/RemotePartitionHelper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.Consts;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class RemotePartitionHelper implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(RemotePartitionHelper.class);
+
+ public static final String URL = "/v1/hoodie/partitioner/getpartitionindex";
+ public static final String NUM_BUCKETS_PARAM = "numbuckets";
+ public static final String PARTITION_PATH_PARAM = "partitionpath";
+ public static final String PARTITION_NUM_PARAM = "partitionnum";
+ private final RetryHelper retryHelper;
+ private final String serverHost;
+ private final Integer serverPort;
+ private final ObjectMapper mapper;
+ private final int timeoutMs;
+ private final HashMap<String, Integer> cache; // dataPartition ->
sparkPartitionIndex
+ public RemotePartitionHelper(FileSystemViewStorageConfig viewConf) {
+ this.retryHelper = new RetryHelper(
+ viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
+ viewConf.getRemoteTimelineClientMaxRetryNumbers(),
+ viewConf.getRemoteTimelineInitialRetryIntervalMs(),
+ viewConf.getRemoteTimelineClientRetryExceptions(),
+ "Sending request");
+ this.serverHost = viewConf.getRemoteViewServerHost();
+ this.serverPort = viewConf.getRemoteViewServerPort();
+ this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000;
+ this.mapper = new ObjectMapper();
+ this.cache = new HashMap<>();
+ }
+
+ public int getPartition(int numBuckets, String partitionPath, int curBucket,
int partitionNum) throws Exception {
+ if (cache.containsKey(partitionPath)) {
+ return computeActualPartition(cache.get(partitionPath), curBucket,
partitionNum);
+ }
+ URIBuilder builder =
+ new
URIBuilder().setHost(serverHost).setPort(serverPort).setPath(URL).setScheme("http");
+
+ builder.addParameter(NUM_BUCKETS_PARAM, String.valueOf(numBuckets));
+ builder.addParameter(PARTITION_PATH_PARAM, partitionPath);
+ builder.addParameter(PARTITION_NUM_PARAM, String.valueOf(partitionNum));
+
+ String url = builder.toString();
+ LOG.debug("Sending request : (" + url + ").");
+ Response response = (Response)(retryHelper != null ? retryHelper.start(()
->
Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute())
+ :
Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute());
+ String content = response.returnContent().asString(Consts.UTF_8);
+ int partitionIndex = Integer.parseInt(mapper.readValue(content, new
TypeReference<String>() {}));
+ cache.put(partitionPath, partitionIndex);
+ return computeActualPartition(partitionIndex, curBucket, partitionNum);
+ }
+
+ private int computeActualPartition(int startOffset, int curBucket, int
partitionNum) {
+ int res = startOffset + curBucket;
+ return res >= partitionNum ? res % partitionNum : res;
+ }
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
index ab623643a62..449283d872c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
@@ -59,7 +59,7 @@ public class DatasetBucketRescaleCommitActionExecutor extends
DatasetBulkInsertO
*/
@Override
protected BulkInsertPartitioner<Dataset<Row>> getPartitioner(boolean
populateMetaFields, boolean isTablePartitioned) {
- return new
BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault(),
hashingConfig);
+ return new
BucketIndexBulkInsertPartitionerWithRows(writeClient.getConfig(),
hashingConfig);
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
index 67915f9a588..adf34641989 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
@@ -22,21 +22,27 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig,
TypedProperties}
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.config.HoodieIndexConfig
+import org.apache.hudi.common.table.view.{FileSystemViewManager,
FileSystemViewStorageConfig}
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.index.bucket.BucketIdentifier
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction
import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.timeline.service.TimelineService
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
-import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession}
+import org.apache.spark.sql.{BucketPartitionUtils,
HoodieCatalystExpressionUtils, SparkSession}
import org.apache.spark.sql.catalyst.encoders.DummyExpressionHolder
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.types._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
+import java.util
+
@Tag("functional")
class TestBucketIndexSupport extends HoodieSparkClientTestBase with
PredicateHelper {
@@ -78,6 +84,52 @@ class TestBucketIndexSupport extends
HoodieSparkClientTestBase with PredicateHel
cleanupSparkContexts()
}
+ @Test
+ def testBucketIndexRemotePartitioner(): Unit = {
+ val numBuckets = 511
+ val config = HoodieWriteConfig.newBuilder
+ .withPath(basePath).
+ withIndexConfig(HoodieIndexConfig.newBuilder()
+ .withBucketNum(numBuckets.toString)
+ .enableBucketRemotePartitioner(true).build())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder
+ .withRemoteServerPort(incrementTimelineServicePortToUse).build)
+ .build
+
+ val timelineService = new TimelineService(context,
HadoopFSUtils.getStorageConf,
+
TimelineService.Config.builder.enableMarkerRequests(true).enableRemotePartitioner(true)
+ .serverPort(config.getViewStorageConfig.getRemoteViewServerPort).build,
+ FileSystemViewManager.createViewManager(context,
config.getMetadataConfig, config.getViewStorageConfig, config.getCommonConfig))
+
+ timelineService.startService
+ this.timelineService = timelineService
+
+ timelineService.startService()
+ val numBucketsFunction = NumBucketsFunction.fromWriteConfig(config)
+ val partitionNum = 1533
+ val partitioner =
BucketPartitionUtils.getRemotePartitioner(config.getViewStorageConfig,
numBucketsFunction, partitionNum)
+ val dataPartitions = List("dt=20250501", "dt=20250502", "dt=20250503",
"dt=20250504", "dt=20250505", "dt=20250506")
+ val res = new util.HashMap[Int, Int]
+ dataPartitions.foreach(dataPartition => {
+ for (i <- 1 to numBuckets) {
+ // for bucket id from 1 to numBuckets
+ // mock 1000 spark partitions
+ val sparkPartition = partitioner.getPartition((dataPartition, i))
+ if (res.containsKey(sparkPartition)) {
+ val value = res.get(sparkPartition) + 1
+ res.put(sparkPartition, value)
+ } else {
+ res.put(sparkPartition, 1)
+ }
+ }
+ })
+ timelineService.close()
+
+ res.values().stream().forEach(value => {
+ assert(value == (numBuckets*dataPartitions.size/partitionNum))
+ })
+ }
+
@Test
def testSingleHashFieldsExpression: Unit = {
val bucketNumber = 19
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 474c7f3c018..fd0e7b328c1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -1890,6 +1890,51 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
+ test("Test Bulk Insert Into Bucket Index Table With Remote Partitioner") {
+ withSQLConf(
+ "hoodie.datasource.write.operation" -> "bulk_insert",
+ "hoodie.bulkinsert.shuffle.parallelism" -> "1") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | primaryKey = 'id,name',
+ | type = 'cow',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'BUCKET',
+ | hoodie.bucket.index.hash.field = 'id,name',
+ | hoodie.bucket.index.remote.partitioner.enable = 'true')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+
+ // Note: Do not write the field alias, the partition field must be
placed last.
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1,1', 10, 1000, "2021-01-05"),
+ | (2, 'a2', 20, 2000, "2021-01-06"),
+ | (3, 'a3,3', 30, 3000, "2021-01-07")
+ """.stripMargin)
+
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
+ )
+ }
+ }
+ }
+
test("Test Insert Overwrite Bucket Index Table") {
withSQLConf(
"hoodie.datasource.write.operation" -> "bulk_insert",
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 701e92e3902..8050c62057f 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -35,12 +35,14 @@ import
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.RemotePartitionHelper;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
import org.apache.hudi.timeline.service.handlers.MarkerHandler;
+import org.apache.hudi.timeline.service.handlers.RemotePartitionerHandler;
import org.apache.hudi.timeline.service.handlers.TimelineHandler;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -83,6 +85,7 @@ public class RequestHandler {
private final FileSliceHandler sliceHandler;
private final BaseFileHandler dataFileHandler;
private final MarkerHandler markerHandler;
+ private RemotePartitionerHandler partitionerHandler;
private final Registry metricsRegistry =
Registry.getRegistry("TimelineService");
private final ScheduledExecutorService asyncResultService;
@@ -101,6 +104,9 @@ public class RequestHandler {
} else {
this.markerHandler = null;
}
+ if (timelineServiceConfig.enableRemotePartitioner) {
+ this.partitionerHandler = new RemotePartitionerHandler(conf,
timelineServiceConfig, viewManager);
+ }
if (timelineServiceConfig.async) {
this.asyncResultService = Executors.newSingleThreadScheduledExecutor();
} else {
@@ -182,6 +188,9 @@ public class RequestHandler {
if (markerHandler != null) {
registerMarkerAPI();
}
+ if (partitionerHandler != null) {
+ registerRemotePartitionerAPI();
+ }
}
public void stop() {
@@ -530,6 +539,16 @@ public class RequestHandler {
}, false));
}
+ private void registerRemotePartitionerAPI() {
+ app.get(RemotePartitionHelper.URL, new ViewHandler(ctx -> {
+ int partition = partitionerHandler.gePartitionIndex(
+ ctx.queryParamAsClass(RemotePartitionHelper.NUM_BUCKETS_PARAM,
String.class).getOrDefault(""),
+ ctx.queryParamAsClass(RemotePartitionHelper.PARTITION_PATH_PARAM,
String.class).getOrDefault(""),
+ ctx.queryParamAsClass(RemotePartitionHelper.PARTITION_NUM_PARAM,
String.class).getOrDefault(""));
+ writeValueAsString(ctx, partition);
+ }, false));
+ }
+
/**
* Used for logging and performing refresh check.
*/
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
index 7dfe55b3e93..a5f3780d253 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
@@ -111,6 +111,9 @@ public class TimelineService {
@Parameter(names = {"--enable-marker-requests", "-em"}, description =
"Enable handling of marker-related requests")
public boolean enableMarkerRequests = false;
+ @Parameter(names = {"--enable-remote-partitioner"}, description = "Enable
remote partitioner")
+ public boolean enableRemotePartitioner = false;
+
@Parameter(names = {"--marker-batch-threads", "-mbt"}, description =
"Number of threads to use for batch processing marker creation requests")
public int markerBatchNumThreads = 20;
@@ -185,6 +188,7 @@ public class TimelineService {
private Long asyncConflictDetectorInitialDelayMs = 0L;
private Long asyncConflictDetectorPeriodMs = 30000L;
private Long maxAllowableHeartbeatIntervalInMs = 120000L;
+ private boolean enableRemotePartitioner = false;
public Builder() {
}
@@ -239,6 +243,11 @@ public class TimelineService {
return this;
}
+ public Builder enableRemotePartitioner(boolean enableRemotePartitioner) {
+ this.enableRemotePartitioner = enableRemotePartitioner;
+ return this;
+ }
+
public Builder markerBatchNumThreads(int markerBatchNumThreads) {
this.markerBatchNumThreads = markerBatchNumThreads;
return this;
@@ -296,6 +305,7 @@ public class TimelineService {
config.async = this.async;
config.compress = this.compress;
config.enableMarkerRequests = this.enableMarkerRequests;
+ config.enableRemotePartitioner = this.enableRemotePartitioner;
config.markerBatchNumThreads = this.markerBatchNumThreads;
config.markerBatchIntervalMs = this.markerBatchIntervalMs;
config.markerParallelism = this.markerParallelism;
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/RemotePartitionerHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/RemotePartitionerHandler.java
new file mode 100644
index 00000000000..74d930b8d80
--- /dev/null
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/RemotePartitionerHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RemotePartitionerHandler extends Handler {
+
+ // cache Map<PartitionPath, BucketStartIndex>
+ private final ConcurrentHashMap<String, Integer> cache;
+ private final AtomicInteger nextIndex = new AtomicInteger(0);
+
+ public RemotePartitionerHandler(StorageConfiguration<?> conf,
TimelineService.Config timelineServiceConfig,
+ FileSystemViewManager viewManager) {
+ super(conf, timelineServiceConfig, viewManager);
+ this.cache = new ConcurrentHashMap<>();
+ }
+
+ public int gePartitionIndex(String numBuckets, String partitionPath, String
partitionNum) {
+ int num = Integer.parseInt(numBuckets);
+ int partNum = Integer.parseInt(partitionNum);
+
+ return cache.computeIfAbsent(partitionPath, key -> {
+ int current;
+ int newNext;
+ int res;
+ do {
+ current = nextIndex.get();
+ res = current;
+ newNext = current + num;
+ if (newNext >= partNum) {
+ newNext = newNext % partNum;
+ }
+ } while (!nextIndex.compareAndSet(current, newNext));
+ return res;
+ });
+ }
+}