yihua commented on code in PR #18396:
URL: https://github.com/apache/hudi/pull/18396#discussion_r3036299291
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java:
##########
@@ -392,30 +404,47 @@ private HoodieWriteConfig.Builder
makeHoodieClientConfigBuilder() {
@Test
void testMapAndListBasedSparkBucketInfoGetter() {
- List<BucketInfo> bucketInfos = Arrays.asList(new
BucketInfo(BucketType.UPDATE, "bucket1", "partition1"),
- new BucketInfo(BucketType.UPDATE, "bucket2", "partition2"));
+ List<BucketInfo> bucketInfos = Arrays.asList(new
BucketInfo(BucketType.UPDATE, "bucket1", "partition1", 42),
+ new BucketInfo(BucketType.UPDATE, "bucket2", "partition2", 99));
Map<Integer, BucketInfo> bucketInfoMap = new HashMap<>();
bucketInfoMap.put(0, bucketInfos.get(0));
bucketInfoMap.put(1, bucketInfos.get(1));
MapBasedSparkBucketInfoGetter getter = new
MapBasedSparkBucketInfoGetter(bucketInfoMap);
ListBasedSparkBucketInfoGetter listGetter = new
ListBasedSparkBucketInfoGetter(bucketInfos);
assertEquals(bucketInfos.get(0), getter.getBucketInfo(0));
assertEquals(bucketInfos.get(0), listGetter.getBucketInfo(0));
+ assertEquals(42, getter.getBucketInfo(0).getNumUpdates());
+ assertEquals(42, listGetter.getBucketInfo(0).getNumUpdates());
assertEquals(bucketInfos.get(1), getter.getBucketInfo(1));
assertEquals(bucketInfos.get(1), listGetter.getBucketInfo(1));
+ assertEquals(99, getter.getBucketInfo(1).getNumUpdates());
+ assertEquals(99, listGetter.getBucketInfo(1).getNumUpdates());
+ }
+
+ @Test
+ void testBucketInfoDefaultNumUpdates() {
+ BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, "file1",
"partition1");
+ assertEquals(-1L, bucketInfo.getNumUpdates(), "Default numUpdates should
be -1 (unknown)");
+
+ BucketInfo bucketInfoWithUpdates = new BucketInfo(BucketType.UPDATE,
"file1", "partition1", 500);
+ assertEquals(500, bucketInfoWithUpdates.getNumUpdates());
+
+ BucketInfo insertBucket = new BucketInfo(BucketType.INSERT, "file2",
"partition1", 0);
+ assertEquals(0, insertBucket.getNumUpdates());
Review Comment:
_⚠️ Potential issue_ | _🟠 Major_
<details>
<summary>🧩 Analysis chain</summary>
🏁 Script executed:
```shell
#!/bin/bash
# Expected result: no output from non-test sources unless an UPDATE bucket
is intentionally unknown.
python - <<'PY'
from pathlib import Path
def iter_calls(text, needle="new BucketInfo("):
i = 0
while True:
start = text.find(needle, i)
if start < 0:
return
j = start + len(needle)
depth = 1
in_str = None
while j < len(text) and depth:
ch = text[j]
if in_str:
if ch == '\\':
j += 2
continue
if ch == in_str:
in_str = None
else:
if ch in ("'", '"'):
in_str = ch
elif ch == '(':
depth += 1
elif ch == ')':
depth -= 1
j += 1
yield start, text[start:j]
i = j
def arg_count(call):
args = call[call.find('(') + 1:-1]
depth = 0
in_str = None
commas = 0
i = 0
while i < len(args):
ch = args[i]
if in_str:
if ch == '\\':
i += 2
continue
if ch == in_str:
in_str = None
else:
if ch in ("'", '"'):
in_str = ch
elif ch in "([{<":
depth += 1
elif ch in ")]}>":
depth -= 1
elif ch == ',' and depth == 0:
commas += 1
i += 1
return 0 if not args.strip() else commas + 1
for path in Path('.').rglob('*.java'):
spath = str(path).replace("\\", "/")
if "/src/test/" in spath:
continue
text = path.read_text(errors='ignore')
for start, call in iter_calls(text):
if "BucketType.UPDATE" in call and arg_count(call) == 3:
line = text.count('\n', 0, start) + 1
print(f"{spath}:{line}: {call.replace(chr(10), ' ')}")
PY
```
Repository: yihua/hudi
Length of output: 1639
---
</details>
**Fix production UPDATE bucket sites still using 3-arg constructor.**
8 production locations still create UPDATE buckets with the 3-arg
`BucketInfo` constructor, which silently defaults `numUpdates` to `-1`:
-
`hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java:123`
-
`hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkBucketIndexBucketInfoGetter.java:59,63`
-
`hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMetadataTableFirstDeltaCommitActionExecutor.java:87`
-
`hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/update/strategy/ConsistentBucketUpdateStrategy.java:130`
-
`hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java:127,140,162`
Since `handleUpdate(...)` now forwards `getNumUpdates()` to `MergeContext`,
these calls will bypass the signal. Pass an explicit count or document why `-1`
is intentional.
<!--
fingerprinting:phantom:medusa:grasshopper:6b6b31b3-3397-49ed-ab1d-32b91559712d
-->
<!-- This is an auto-generated comment by CodeRabbit -->
— *CodeRabbit*
([original](https://github.com/yihua/hudi/pull/14#discussion_r3036299135))
(source:comment#3036299135)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java:
##########
@@ -137,7 +137,7 @@ public HoodieWriteMergeHandle(HoodieWriteConfig config,
String instantTime, Hood
Map<String, HoodieRecord<T>> keyToNewRecords,
String partitionPath, String fileId,
HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
- super(config, instantTime, hoodieTable, partitionPath, fileId,
taskContextSupplier, dataFileToBeMerged, keyGeneratorOpt,
+ super(config, instantTime, hoodieTable,
MergeContext.create(Collections.emptyIterator()), partitionPath, fileId,
taskContextSupplier, dataFileToBeMerged, keyGeneratorOpt,
// preserveMetadata is disabled by default for MDT but enabled
otherwise
!HoodieTableMetadata.isMetadataTable(config.getBasePath()));
this.keyToNewRecords = keyToNewRecords;
Review Comment:
<a href="#"><img alt="P2"
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"
align="top"></a> **`MergeContext.create(Collections.emptyIterator())` loses
`numIncomingUpdates` in MDT path**
In the internal constructor used for Metadata Table (MDT) writes, a fresh
`MergeContext` with no iterator and `numIncomingUpdates = -1L` is created, even
though `keyToNewRecords` (passed in from the caller) was already populated.
This is correct — the MDT path passes a pre-built map — but it's worth noting
that this path will always have `numIncomingUpdates = -1`. If the MDT path ever
needs accurate counts in the future, callers will need to be updated.
This is not an immediate bug, but the comment or Javadoc on this constructor
could be improved to make this "always -1" semantic explicit for maintainers.
— *Greptile*
([original](https://github.com/yihua/hudi/pull/14#discussion_r3036291940))
(source:comment#3036291940)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]