fresh-borzoni commented on code in PR #3260:
URL: https://github.com/apache/fluss/pull/3260#discussion_r3209707416
##########
fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java:
##########
@@ -37,10 +37,17 @@
public class BucketOffsetsRetrieverImpl implements
OffsetsInitializer.BucketOffsetsRetriever {
private final Admin flussAdmin;
private final TablePath tablePath;
+ private final Boolean fetchEarliestOffset;
Review Comment:
mb better to use primitive, since it doesn't allow null
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala:
##########
@@ -91,26 +92,64 @@ class FlussAppendBatch(
}
override def planInputPartitions(): Array[InputPartition] = {
- val bucketOffsetsRetrieverImpl = new BucketOffsetsRetrieverImpl(admin,
tablePath)
+ val maxRecordsPerPartition: Option[Long] = {
+ val opt =
flussConfig.getOptional(SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION)
+ if (opt.isPresent) Some(opt.get().longValue()) else None
+ }
+
+ val bucketOffsetsRetrieverImpl = maxRecordsPerPartition match {
+ case Some(_) => new BucketOffsetsRetrieverImpl(admin, tablePath, true)
+ case None => new BucketOffsetsRetrieverImpl(admin, tablePath)
+ }
val buckets = (0 until tableInfo.getNumBuckets).toSeq
+ def splitOffsetRange(
+ tableBucket: TableBucket,
+ startOffset: Long,
+ stopOffset: Long,
+ maxRecords: Long): Seq[InputPartition] = {
+ if (
+ startOffset < 0 || stopOffset <= startOffset || stopOffset <=
(startOffset + maxRecords)
Review Comment:
for the earliest mode we have sentinel -2L, I think it would result in a bug
here, since we clamp to 1 split
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala:
##########
@@ -91,26 +92,64 @@ class FlussAppendBatch(
}
override def planInputPartitions(): Array[InputPartition] = {
- val bucketOffsetsRetrieverImpl = new BucketOffsetsRetrieverImpl(admin,
tablePath)
+ val maxRecordsPerPartition: Option[Long] = {
+ val opt =
flussConfig.getOptional(SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION)
+ if (opt.isPresent) Some(opt.get().longValue()) else None
+ }
+
+ val bucketOffsetsRetrieverImpl = maxRecordsPerPartition match {
+ case Some(_) => new BucketOffsetsRetrieverImpl(admin, tablePath, true)
+ case None => new BucketOffsetsRetrieverImpl(admin, tablePath)
+ }
val buckets = (0 until tableInfo.getNumBuckets).toSeq
+ def splitOffsetRange(
+ tableBucket: TableBucket,
+ startOffset: Long,
+ stopOffset: Long,
+ maxRecords: Long): Seq[InputPartition] = {
+ if (
+ startOffset < 0 || stopOffset <= startOffset || stopOffset <=
(startOffset + maxRecords)
+ ) {
+ return Seq(
+ FlussAppendInputPartition(tableBucket, startOffset, stopOffset)
+ .asInstanceOf[InputPartition])
+ }
+ val rangeSize = stopOffset - startOffset
+ val numSplits = ((rangeSize + maxRecords - 1) / maxRecords).toInt
+ val step = (rangeSize + numSplits - 1) / numSplits
+
+ Iterator
+ .from(0)
+ .take(numSplits)
+ .map(i => startOffset + i * step)
+ .map {
+ from =>
+ FlussAppendInputPartition(tableBucket, from, math.min(from + step,
stopOffset))
+ .asInstanceOf[InputPartition]
Review Comment:
ditto
##########
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala:
##########
@@ -524,4 +524,13 @@ class SparkLogTableReadTest extends FlussSparkTestBase {
assert(numRowsRead == 5L, s"Expected 5 rows read, got $numRowsRead")
}
}
+
+ test("Spark Read: split partition by config") {
Review Comment:
The test only checks row order/values, not partition count. Also what about
partitioned tables? Earliest mode?
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala:
##########
@@ -91,26 +92,64 @@ class FlussAppendBatch(
}
override def planInputPartitions(): Array[InputPartition] = {
- val bucketOffsetsRetrieverImpl = new BucketOffsetsRetrieverImpl(admin,
tablePath)
+ val maxRecordsPerPartition: Option[Long] = {
+ val opt =
flussConfig.getOptional(SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION)
+ if (opt.isPresent) Some(opt.get().longValue()) else None
+ }
+
+ val bucketOffsetsRetrieverImpl = maxRecordsPerPartition match {
+ case Some(_) => new BucketOffsetsRetrieverImpl(admin, tablePath, true)
+ case None => new BucketOffsetsRetrieverImpl(admin, tablePath)
+ }
val buckets = (0 until tableInfo.getNumBuckets).toSeq
+ def splitOffsetRange(
+ tableBucket: TableBucket,
+ startOffset: Long,
+ stopOffset: Long,
+ maxRecords: Long): Seq[InputPartition] = {
+ if (
+ startOffset < 0 || stopOffset <= startOffset || stopOffset <=
(startOffset + maxRecords)
+ ) {
+ return Seq(
+ FlussAppendInputPartition(tableBucket, startOffset, stopOffset)
+ .asInstanceOf[InputPartition])
+ }
+ val rangeSize = stopOffset - startOffset
+ val numSplits = ((rangeSize + maxRecords - 1) / maxRecords).toInt
+ val step = (rangeSize + numSplits - 1) / numSplits
+
+ Iterator
+ .from(0)
+ .take(numSplits)
+ .map(i => startOffset + i * step)
+ .map {
+ from =>
+ FlussAppendInputPartition(tableBucket, from, math.min(from + step,
stopOffset))
+ .asInstanceOf[InputPartition]
+ }
+ .toSeq
+ }
+
def createPartitions(
partitionId: Option[Long],
startBucketOffsets: Map[Integer, Long],
stoppingBucketOffsets: Map[Integer, Long]): Array[InputPartition] = {
- buckets.map {
+ buckets.flatMap {
bucketId =>
- val (startBucketOffset, stoppingBucketOffset) =
+ val (startOffset, stopOffset) =
(startBucketOffsets(bucketId), stoppingBucketOffsets(bucketId))
- partitionId match {
- case Some(partitionId) =>
- val tableBucket = new TableBucket(tableInfo.getTableId,
partitionId, bucketId)
- FlussAppendInputPartition(tableBucket, startBucketOffset,
stoppingBucketOffset)
- .asInstanceOf[InputPartition]
+ val tableBucket = partitionId match {
+ case Some(pid) => new TableBucket(tableInfo.getTableId, pid,
bucketId)
+ case None => new TableBucket(tableInfo.getTableId, bucketId)
+ }
+ maxRecordsPerPartition match {
+ case Some(maxRecs) =>
+ splitOffsetRange(tableBucket, startOffset, stopOffset, maxRecs)
case None =>
- val tableBucket = new TableBucket(tableInfo.getTableId, bucketId)
- FlussAppendInputPartition(tableBucket, startBucketOffset,
stoppingBucketOffset)
- .asInstanceOf[InputPartition]
+ Seq(
+ FlussAppendInputPartition(tableBucket, startOffset, stopOffset)
+ .asInstanceOf[InputPartition])
Review Comment:
ditto
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala:
##########
@@ -91,26 +92,64 @@ class FlussAppendBatch(
}
override def planInputPartitions(): Array[InputPartition] = {
- val bucketOffsetsRetrieverImpl = new BucketOffsetsRetrieverImpl(admin,
tablePath)
+ val maxRecordsPerPartition: Option[Long] = {
+ val opt =
flussConfig.getOptional(SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION)
+ if (opt.isPresent) Some(opt.get().longValue()) else None
+ }
+
+ val bucketOffsetsRetrieverImpl = maxRecordsPerPartition match {
+ case Some(_) => new BucketOffsetsRetrieverImpl(admin, tablePath, true)
+ case None => new BucketOffsetsRetrieverImpl(admin, tablePath)
+ }
val buckets = (0 until tableInfo.getNumBuckets).toSeq
+ def splitOffsetRange(
+ tableBucket: TableBucket,
+ startOffset: Long,
+ stopOffset: Long,
+ maxRecords: Long): Seq[InputPartition] = {
+ if (
+ startOffset < 0 || stopOffset <= startOffset || stopOffset <=
(startOffset + maxRecords)
+ ) {
+ return Seq(
+ FlussAppendInputPartition(tableBucket, startOffset, stopOffset)
+ .asInstanceOf[InputPartition])
Review Comment:
nit: I think it's redundant
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]