hudi-agent commented on code in PR #18768:
URL: https://github.com/apache/hudi/pull/18768#discussion_r3272599045


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/keyedstate/MockKeyedMapState.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils.keyedstate;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.hudi.sink.utils.MockMapState;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Mock keyed map state for testing.
+ * @param <K> Type of state key
+ * @param <V> Type of state value
+ */
+public class MockKeyedMapState<K, V> implements MapState<K, V>, Cloneable {
+  private MockKeyContext keyContext;
+  private Map<Object, MockMapState<K, V>> mockMapStateMap;
+
+  public MockKeyedMapState(MockKeyContext keyContext) {
+    Objects.requireNonNull(keyContext, "keyContext is null");
+    this.keyContext = keyContext;
+  }

Review Comment:
   🤖 Same issue as `MockKeyedListState` — `mockMapStateMap` is never 
initialized in the constructor, so the first call to any method 
(get/put/contains/etc.) will NPE on `mockMapStateMap.computeIfAbsent(...)`. 
Could you add `this.mockMapStateMap = new HashMap<>();` here?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/keyedstate/MockKeyedListState.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils.keyedstate;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Mock keyed list state for testing.
+ * @param <T> Type of state value
+ */
+public class MockKeyedListState<T> implements ListState<T>, Cloneable  {
+  private MockKeyContext keyContext;
+  private Map<Object, TestUtils.MockListState<T>> mockListStateMap;
+
+  public MockKeyedListState(MockKeyContext keyContext) {
+    Objects.requireNonNull(keyContext, "keyContext is null");
+    this.keyContext = keyContext;
+  }

Review Comment:
   🤖 `mockListStateMap` is declared on line 34 but never initialized — the 
constructor only assigns `keyContext`. Any call to 
`update`/`add`/`get`/`addAll`/`clear` will NPE at the 
`mockListStateMap.computeIfAbsent(...)` line. Compare with 
`MockKeyedValueState` which initializes `this.keyValueStateMap = new 
HashMap<>()` in its constructor. Could you add `this.mockListStateMap = new 
HashMap<>();` here?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java:
##########
@@ -53,9 +53,13 @@ protected HoodieFlinkTable(HoodieWriteConfig config, 
HoodieEngineContext context
   }
 
   public static <T> HoodieFlinkTable<T> create(HoodieWriteConfig config, 
HoodieEngineContext context) {
+    return create(config, context, false);

Review Comment:
   🤖 This silently changes the default `loadActiveTimelineOnLoad` from `true` 
to `false` for every caller of the 2-arg `create(config, context)`. That 
includes `HoodieFlinkWriteClient.getHoodieTable()`, 
`FlinkUpgradeDowngradeHelper.getTable()`, `WriteProfile.getTable()`, and 
`FlinkTables.createTableInternal`. The new lazy default is the right intent for 
the per-task write client path, but `FlinkUpgradeDowngradeHelper` (and arguably 
`WriteProfile`) likely expect the timeline to be loaded - upgrade/downgrade 
reasons over completed instants, and `WriteProfile` walks 
`getCommitsTimeline()`. Was this intentional for those callers, or would it be 
safer to keep the 2-arg version returning `true` and have only 
`HoodieFlinkWriteClient.getHoodieTable(false)` opt in to lazy loading?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -312,12 +351,48 @@ public void subtaskReady(int i, SubtaskGateway 
subtaskGateway) {
     this.gateways[i] = subtaskGateway;
   }
 
+  @Override
+  public CompletableFuture<CoordinationResponse> 
handleCoordinationRequest(CoordinationRequest request) {
+    CompletableFuture<CoordinationResponse> response = new 
CompletableFuture<>();
+    instantRequestExecutor.execute(() -> {
+      Correspondent.InstantTimeRequest instantTimeRequest = 
(Correspondent.InstantTimeRequest) request;
+      long checkpointId = instantTimeRequest.getCheckpointId();
+      Pair<String, WriteMetadataEvent[]> instantTimeAndEventBuffer = 
this.eventBuffers.getInstantAndEventBuffer(checkpointId);
+      final String instantTime;
+      if (instantTimeAndEventBuffer == null) {
+        // wait until previous instants are committed.
+        eventBuffers.awaitAllInstantsToCompleteIfNecessary();
+        instantTime = startInstant();
+        this.eventBuffers.initNewEventBuffer(checkpointId, instantTime, 
this.parallelism);
+      } else {
+        instantTime = instantTimeAndEventBuffer.getLeft();

Review Comment:
   🤖 If `startInstant()` throws here (e.g. timeline transition fails, or the 
lock can't be acquired), the `response` future is never completed - it's not in 
a try/catch that completes it exceptionally. The executor's exception hook will 
call `context.failJob(...)`, but until the JM tears the task down the writer is 
blocked on `response.get()` inside `Correspondent.requestInstantTime`. Could we 
wrap this body in `try { ... } catch (Throwable t) { 
response.completeExceptionally(t); throw t; }` so the writer fails fast rather 
than hanging?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -582,7 +582,7 @@ private FlinkOptions() {
   public static final ConfigOption<Long> WRITE_COMMIT_ACK_TIMEOUT = 
ConfigOptions
       .key("write.commit.ack.timeout")
       .longType()
-      .defaultValue(-1L) // default at least once
+      .defaultValue(300_000L)

Review Comment:
   🤖 Changing the default from `-1` (at-least-once / wait indefinitely) to 
`300_000` is a meaningful behavior change for existing users. The new code also 
uses this value directly in `CommitGuard.create(...)` and `Condition.await(-1, 
MILLIS)` returns immediately, which means anyone with the previous default 
explicitly set in their config will start getting `HoodieException: 
Timeout(-1ms)` errors. Could we either: (a) call this out in the PR description 
as a breaking change for users with explicit `-1`, or (b) treat `-1` as "no 
timeout" inside `CommitGuard` (skip the timed `await` and use the untimed one)?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java:
##########


Review Comment:
   🤖 **Line 269:** This silently overrides the user-configured 
`hoodie.cleaner.policy.failed.writes` to LAZY whenever non-blocking instant 
generation is enabled, with no log message. If a user explicitly set EAGER 
they'll likely be confused when behavior doesn't match their config. Could you 
at least log a warning when overriding a user-provided value (e.g. when the 
conf already contains the key with a non-LAZY value)? Or consider validating 
up-front and failing fast with a clear error.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Utilities for coordinator event buffer.
+ */
+public class EventBuffers implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  // {checkpointId -> (instant, events)}
+  private final Map<Long, Pair<String, WriteMetadataEvent[]>> eventBuffers;
+  private final Option<CommitGuard> commitGuardOption;
+
+  private EventBuffers(Map<Long, Pair<String, WriteMetadataEvent[]>> 
eventBuffers, Option<CommitGuard> commitGuardOption) {
+    this.eventBuffers = eventBuffers;
+    this.commitGuardOption = commitGuardOption;
+  }
+
+  public static EventBuffers getInstance(Configuration conf) {
+    final Option<CommitGuard> commitGuardOpt = 
OptionsResolver.isBlockingInstantGeneration(conf)
+        ? 
Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))) 
: Option.empty();
+    return new EventBuffers(new ConcurrentSkipListMap<>(), commitGuardOpt);
+  }
+
+  /**
+   * Checks the buffer is ready to commit.
+   */
+  public static boolean allEventsReceived(WriteMetadataEvent[] eventBuffer) {
+    return Arrays.stream(eventBuffer)
+        // we do not use event.isReady to check the instant
+        // because the write task may send an event eagerly for empty
+        // data set, the even may have a timestamp of last committed instant.
+        .allMatch(event -> event != null && event.isLastBatch());
+  }
+
+  public WriteMetadataEvent[] addEventToBuffer(WriteMetadataEvent event) {
+    WriteMetadataEvent[] eventBuffer = 
this.eventBuffers.get(event.getCheckpointId()).getRight();
+    if (eventBuffer[event.getTaskID()] != null
+        && 
eventBuffer[event.getTaskID()].getInstantTime().equals(event.getInstantTime())) 
{
+      eventBuffer[event.getTaskID()].mergeWith(event);
+    } else {
+      eventBuffer[event.getTaskID()] = event;
+    }
+    return eventBuffer;
+  }
+
+  public void addEventsToBuffer(Map<Long, Pair<String, WriteMetadataEvent[]>> 
events) {
+    this.eventBuffers.putAll(events);
+  }
+
+  /**
+   * Returns existing bootstrap buffer or creates a new one.
+   */
+  public WriteMetadataEvent[] getOrCreateBootstrapBuffer(WriteMetadataEvent 
event, int parallelism) {
+    return this.eventBuffers.computeIfAbsent(event.getCheckpointId(),
+        ckpId -> Pair.of(event.getInstantTime(), new 
WriteMetadataEvent[parallelism])).getRight();
+  }
+
+  /**
+   * Cleans the task events triggered after or on the give event checkpoint ID.
+   */
+  public void cleanLegacyEvents(WriteMetadataEvent event) {
+    this.eventBuffers.entrySet().stream()
+        .filter(entry -> entry.getKey().compareTo(event.getCheckpointId()) >= 
0)
+        .map(entry -> entry.getValue().getRight())
+        .forEach(eventBuffer -> resetBufferAt(eventBuffer, event.getTaskID()));
+  }
+
+  private static void resetBufferAt(WriteMetadataEvent[] eventBuffer, int idx) 
{
+    if (eventBuffer.length > idx) {
+      eventBuffer[idx] = null;
+    }
+  }
+
+  /**
+   * Returns the pair of instant time and event buffer.
+   */
+  public Pair<String, WriteMetadataEvent[]> getInstantAndEventBuffer(long 
checkpointId) {
+    return this.eventBuffers.get(checkpointId);
+  }
+
+  public WriteMetadataEvent[] getLatestEventBuffer(String instantTime) {
+    return eventBuffers.values().stream().filter(val -> 
val.getLeft().equals(instantTime)).findFirst().map(Pair::getRight).orElse(null);
+  }
+
+  public WriteMetadataEvent[] getEventBuffer(long checkpointId) {
+    return 
Option.ofNullable(this.eventBuffers.get(checkpointId)).map(Pair::getRight).orElse(null);
+  }
+
+  public Stream<Map.Entry<Long, Pair<String, WriteMetadataEvent[]>>> 
getEventBufferStream() {
+    return this.eventBuffers.entrySet().stream();
+  }
+
+  public void initNewEventBuffer(long checkpointId, String instantTime, int 
parallelism) {
+    this.eventBuffers.put(checkpointId, Pair.of(instantTime, new 
WriteMetadataEvent[parallelism]));
+  }
+
+  public void awaitAllInstantsToCompleteIfNecessary() {
+    if (this.commitGuardOption.isPresent() && nonEmpty()) {
+      this.commitGuardOption.get().blockFor(getPendingInstants());
+    }
+  }
+
+  public void reset(long checkpointId) {
+    this.eventBuffers.remove(checkpointId);
+    this.commitGuardOption.ifPresent(CommitGuard::unblock);
+  }
+
+  public boolean nonEmpty() {
+    return this.eventBuffers.values().stream()
+        .map(Pair::getValue)
+        .flatMap(Arrays::stream).anyMatch(Objects::nonNull);
+  }
+
+  public String getPendingInstants() {
+    return 
this.eventBuffers.values().stream().map(Pair::getKey).collect(Collectors.joining(","));
+  }
+
+  /**
+   * Get write metadata events where there exists no event sent by eager 
flushing from writers.
+   */
+  public Map<Long, Pair<String, WriteMetadataEvent[]>> getAllCompletedEvents() 
{

Review Comment:
   🤖 nit: `getAllCompletedEvents()` could mislead future readers into thinking 
these are events for Hudi-completed (committed) instants — the Javadoc says it 
filters out in-flight eager-flush entries, so something like 
`getCheckpointableBuffers()` or `getAllLastBatchBuffers()` might signal the 
intent more clearly.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java:
##########


Review Comment:
   🤖 **Line 306:** `AssertionError` extends `Error`, not `Exception`, so if 
`checkWrittenData(expected, 1)` fails inside `t1`, the catch block won't catch 
it — the thread will die silently and `t1.join()` will return normally, leaving 
the test passing despite the failed assertion. This may be why 
`TestWriteMergeOnReadWithCompact.testBlockedInstantTimeRequest` documents the 
test as flaky. Could you catch `Throwable` and store it in an `AtomicReference` 
for the main thread to rethrow after `join()`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java:
##########


Review Comment:
   🤖 **Line 185:** nit: `writeConfig` is assigned here but never used after 
this point — it looks like a leftover from the `CkpMetadata` replacement. Could 
you remove it (or actually use it if something depends on this call)?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java:
##########


Review Comment:
   🤖 **Line 556:** nit: `assertDataFilesExists` — the subject "files" is 
plural, so it might read more naturally as `assertDataFilesExist` (and the 
sibling `assertEmptyBaseFiles` already follows this pattern implicitly).
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CommitGuard.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The commit guard used for blocking instant time generation.
+ *
+ * <p>A new instant time can not be generated until all the pending instants 
are committed.
+ */
+public class CommitGuard {
+  /**
+   * A lock used to block instant generation until being signaled after 
pending instant is committed.
+   */
+  private final Lock lock;
+  private final Condition condition;
+  private final long commitAckTimeout;
+
+  private CommitGuard(long commitAckTimeout) {
+    this.lock = new ReentrantLock();
+    this.condition = lock.newCondition();
+    this.commitAckTimeout = commitAckTimeout;
+  }
+
+  public static CommitGuard create(long commitAckTimeout) {
+    return new CommitGuard(commitAckTimeout);
+  }
+
+  /**
+   * Wait until all the pending instants are committed.
+   *
+   * @param instants Current pending instants
+   */
+  public void blockFor(String instants) {
+    lock.lock();
+    try {
+      if (!condition.await(commitAckTimeout, TimeUnit.MILLISECONDS)) {
+        throw new HoodieException("Timeout(" + commitAckTimeout + "ms) while 
waiting for instants [" + instants + "] to commit");
+      }

Review Comment:
   🤖 This `await` isn't guarded by a predicate, which exposes two issues: (1) 
if `unblock()` runs before another thread enters `await()`, the signal is lost 
and the waiter blocks for the full `commitAckTimeout`; (2) every 
`EventBuffers.reset(...)` calls `unblock()`, so if multiple checkpoints are 
pending, the first commit will wake the waiter and `blockFor` returns even 
though there are still pending instants. For CoW upsert (where 
`isBlockingInstantGeneration` is true), that breaks the intended serialization 
across instants. Could we loop while `pendingInstants` is non-empty (similar to 
the supplier-based pattern), so the awaiter only proceeds when all pending 
instants are actually drained? @danny0405
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/ExplicitClassloaderThreadFactory.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.adapter.Utils;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * A thread factory with explicit classloader.
+ */
+public class ExplicitClassloaderThreadFactory implements ThreadFactory {
+  private final String threadName;
+  private final ClassLoader cl;
+  private final Thread.UncaughtExceptionHandler errorHandler;
+
+  private Thread t;

Review Comment:
   🤖 nit: `t` is a pretty opaque name for the field tracking the single created 
thread — `thread` (or `singleThread`) would make the `if (t != null)` guard in 
`newThread` immediately readable.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/keyedstate/MockKeyedValueState.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils.keyedstate;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.hudi.sink.utils.MockValueState;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Mock keyed list state for testing.
+ * @param <V> Type of state value

Review Comment:
   🤖 nit: the Javadoc here says "Mock keyed **list** state for testing" — looks 
like a copy-paste from `MockKeyedListState`. Could you update it to "Mock keyed 
value state for testing"?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java:
##########
@@ -108,6 +111,14 @@ public void setTaskID(int taskID) {
     this.taskID = taskID;
   }
 
+  public Long getCheckpointId() {

Review Comment:
   🤖 The field is primitive `long` but the getter returns boxed `Long`. This 
works via autoboxing but allocates and can surprise callers who do 
`event.getCheckpointId() == someValue` (reference equality on Long). Could we 
make this `public long getCheckpointId()` for consistency with the field type 
and the setter signature?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to