hudi-agent commented on code in PR #18768: URL: https://github.com/apache/hudi/pull/18768#discussion_r3272599045
########## hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/keyedstate/MockKeyedMapState.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils.keyedstate; + +import org.apache.flink.api.common.state.MapState; +import org.apache.hudi.sink.utils.MockMapState; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Mock keyed map state for testing. + * @param <K> Type of state key + * @param <V> Type of state value + */ +public class MockKeyedMapState<K, V> implements MapState<K, V>, Cloneable { + private MockKeyContext keyContext; + private Map<Object, MockMapState<K, V>> mockMapStateMap; + + public MockKeyedMapState(MockKeyContext keyContext) { + Objects.requireNonNull(keyContext, "keyContext is null"); + this.keyContext = keyContext; + } Review Comment: 🤖 Same issue as `MockKeyedListState` — `mockMapStateMap` is never initialized in the constructor, so the first call to any method (get/put/contains/etc.) will NPE on `mockMapStateMap.computeIfAbsent(...)`. Could you add `this.mockMapStateMap = new HashMap<>();` here? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/keyedstate/MockKeyedListState.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils.keyedstate; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Mock keyed list state for testing. + * @param <T> Type of state value + */ +public class MockKeyedListState<T> implements ListState<T>, Cloneable { + private MockKeyContext keyContext; + private Map<Object, TestUtils.MockListState<T>> mockListStateMap; + + public MockKeyedListState(MockKeyContext keyContext) { + Objects.requireNonNull(keyContext, "keyContext is null"); + this.keyContext = keyContext; + } Review Comment: 🤖 `mockListStateMap` is declared on line 34 but never initialized — the constructor only assigns `keyContext`. Any call to `update`/`add`/`get`/`addAll`/`clear` will NPE at the `mockListStateMap.computeIfAbsent(...)` line. Compare with `MockKeyedValueState` which initializes `this.keyValueStateMap = new HashMap<>()` in its constructor. Could you add `this.mockListStateMap = new HashMap<>();` here? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java: ########## @@ -53,9 +53,13 @@ protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context } public static <T> HoodieFlinkTable<T> create(HoodieWriteConfig config, HoodieEngineContext context) { + return create(config, context, false); Review Comment: 🤖 This silently changes the default `loadActiveTimelineOnLoad` from `true` to `false` for every caller of the 2-arg `create(config, context)`. That includes `HoodieFlinkWriteClient.getHoodieTable()`, `FlinkUpgradeDowngradeHelper.getTable()`, `WriteProfile.getTable()`, and `FlinkTables.createTableInternal`. The new lazy default is the right intent for the per-task write client path, but `FlinkUpgradeDowngradeHelper` (and arguably `WriteProfile`) likely expect the timeline to be loaded - upgrade/downgrade reasons over completed instants, and `WriteProfile` walks `getCommitsTimeline()`. Was this intentional for those callers, or would it be safer to keep the 2-arg version returning `true` and have only `HoodieFlinkWriteClient.getHoodieTable(false)` opt in to lazy loading? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java: ########## @@ -312,12 +351,48 @@ public void subtaskReady(int i, SubtaskGateway subtaskGateway) { this.gateways[i] = subtaskGateway; } + @Override + public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) { + CompletableFuture<CoordinationResponse> response = new CompletableFuture<>(); + instantRequestExecutor.execute(() -> { + Correspondent.InstantTimeRequest instantTimeRequest = (Correspondent.InstantTimeRequest) request; + long checkpointId = instantTimeRequest.getCheckpointId(); + Pair<String, WriteMetadataEvent[]> instantTimeAndEventBuffer = this.eventBuffers.getInstantAndEventBuffer(checkpointId); + final String instantTime; + if (instantTimeAndEventBuffer == null) { + // wait until previous instants are committed. + eventBuffers.awaitAllInstantsToCompleteIfNecessary(); + instantTime = startInstant(); + this.eventBuffers.initNewEventBuffer(checkpointId, instantTime, this.parallelism); + } else { + instantTime = instantTimeAndEventBuffer.getLeft(); Review Comment: 🤖 If `startInstant()` throws here (e.g. timeline transition fails, or the lock can't be acquired), the `response` future is never completed - it's not in a try/catch that completes it exceptionally. The executor's exception hook will call `context.failJob(...)`, but until the JM tears the task down the writer is blocked on `response.get()` inside `Correspondent.requestInstantTime`. Could we wrap this body in `try { ... } catch (Throwable t) { response.completeExceptionally(t); throw t; }` so the writer fails fast rather than hanging? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java: ########## @@ -582,7 +582,7 @@ private FlinkOptions() { public static final ConfigOption<Long> WRITE_COMMIT_ACK_TIMEOUT = ConfigOptions .key("write.commit.ack.timeout") .longType() - .defaultValue(-1L) // default at least once + .defaultValue(300_000L) Review Comment: 🤖 Changing the default from `-1` (at-least-once / wait indefinitely) to `300_000` is a meaningful behavior change for existing users. The new code also uses this value directly in `CommitGuard.create(...)` and `Condition.await(-1, MILLIS)` returns immediately, which means anyone with the previous default explicitly set in their config will start getting `HoodieException: Timeout(-1ms)` errors. Could we either: (a) call this out in the PR description as a breaking change for users with explicit `-1`, or (b) treat `-1` as "no timeout" inside `CommitGuard` (skip the timed `await` and use the untimed one)? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java: ########## Review Comment: 🤖 **Line 269:** This silently overrides the user-configured `hoodie.cleaner.policy.failed.writes` to LAZY whenever non-blocking instant generation is enabled, with no log message. If a user explicitly set EAGER they'll likely be confused when behavior doesn't match their config. Could you at least log a warning when overriding a user-provided value (e.g. when the conf already contains the key with a non-LAZY value)? Or consider validating up-front and failing fast with a clear error. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.sink.event.WriteMetadataEvent; + +import org.apache.flink.configuration.Configuration; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Utilities for coordinator event buffer. + */ +public class EventBuffers implements Serializable { + private static final long serialVersionUID = 1L; + + // {checkpointId -> (instant, events)} + private final Map<Long, Pair<String, WriteMetadataEvent[]>> eventBuffers; + private final Option<CommitGuard> commitGuardOption; + + private EventBuffers(Map<Long, Pair<String, WriteMetadataEvent[]>> eventBuffers, Option<CommitGuard> commitGuardOption) { + this.eventBuffers = eventBuffers; + this.commitGuardOption = commitGuardOption; + } + + public static EventBuffers getInstance(Configuration conf) { + final Option<CommitGuard> commitGuardOpt = OptionsResolver.isBlockingInstantGeneration(conf) + ? Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))) : Option.empty(); + return new EventBuffers(new ConcurrentSkipListMap<>(), commitGuardOpt); + } + + /** + * Checks the buffer is ready to commit. + */ + public static boolean allEventsReceived(WriteMetadataEvent[] eventBuffer) { + return Arrays.stream(eventBuffer) + // we do not use event.isReady to check the instant + // because the write task may send an event eagerly for empty + // data set, the even may have a timestamp of last committed instant. + .allMatch(event -> event != null && event.isLastBatch()); + } + + public WriteMetadataEvent[] addEventToBuffer(WriteMetadataEvent event) { + WriteMetadataEvent[] eventBuffer = this.eventBuffers.get(event.getCheckpointId()).getRight(); + if (eventBuffer[event.getTaskID()] != null + && eventBuffer[event.getTaskID()].getInstantTime().equals(event.getInstantTime())) { + eventBuffer[event.getTaskID()].mergeWith(event); + } else { + eventBuffer[event.getTaskID()] = event; + } + return eventBuffer; + } + + public void addEventsToBuffer(Map<Long, Pair<String, WriteMetadataEvent[]>> events) { + this.eventBuffers.putAll(events); + } + + /** + * Returns existing bootstrap buffer or creates a new one. + */ + public WriteMetadataEvent[] getOrCreateBootstrapBuffer(WriteMetadataEvent event, int parallelism) { + return this.eventBuffers.computeIfAbsent(event.getCheckpointId(), + ckpId -> Pair.of(event.getInstantTime(), new WriteMetadataEvent[parallelism])).getRight(); + } + + /** + * Cleans the task events triggered after or on the give event checkpoint ID. + */ + public void cleanLegacyEvents(WriteMetadataEvent event) { + this.eventBuffers.entrySet().stream() + .filter(entry -> entry.getKey().compareTo(event.getCheckpointId()) >= 0) + .map(entry -> entry.getValue().getRight()) + .forEach(eventBuffer -> resetBufferAt(eventBuffer, event.getTaskID())); + } + + private static void resetBufferAt(WriteMetadataEvent[] eventBuffer, int idx) { + if (eventBuffer.length > idx) { + eventBuffer[idx] = null; + } + } + + /** + * Returns the pair of instant time and event buffer. + */ + public Pair<String, WriteMetadataEvent[]> getInstantAndEventBuffer(long checkpointId) { + return this.eventBuffers.get(checkpointId); + } + + public WriteMetadataEvent[] getLatestEventBuffer(String instantTime) { + return eventBuffers.values().stream().filter(val -> val.getLeft().equals(instantTime)).findFirst().map(Pair::getRight).orElse(null); + } + + public WriteMetadataEvent[] getEventBuffer(long checkpointId) { + return Option.ofNullable(this.eventBuffers.get(checkpointId)).map(Pair::getRight).orElse(null); + } + + public Stream<Map.Entry<Long, Pair<String, WriteMetadataEvent[]>>> getEventBufferStream() { + return this.eventBuffers.entrySet().stream(); + } + + public void initNewEventBuffer(long checkpointId, String instantTime, int parallelism) { + this.eventBuffers.put(checkpointId, Pair.of(instantTime, new WriteMetadataEvent[parallelism])); + } + + public void awaitAllInstantsToCompleteIfNecessary() { + if (this.commitGuardOption.isPresent() && nonEmpty()) { + this.commitGuardOption.get().blockFor(getPendingInstants()); + } + } + + public void reset(long checkpointId) { + this.eventBuffers.remove(checkpointId); + this.commitGuardOption.ifPresent(CommitGuard::unblock); + } + + public boolean nonEmpty() { + return this.eventBuffers.values().stream() + .map(Pair::getValue) + .flatMap(Arrays::stream).anyMatch(Objects::nonNull); + } + + public String getPendingInstants() { + return this.eventBuffers.values().stream().map(Pair::getKey).collect(Collectors.joining(",")); + } + + /** + * Get write metadata events where there exists no event sent by eager flushing from writers. + */ + public Map<Long, Pair<String, WriteMetadataEvent[]>> getAllCompletedEvents() { Review Comment: 🤖 nit: `getAllCompletedEvents()` could mislead future readers into thinking these are events for Hudi-completed (committed) instants — the Javadoc says it filters out in-flight eager-flush entries, so something like `getCheckpointableBuffers()` or `getAllLastBatchBuffers()` might signal the intent more clearly. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java: ########## Review Comment: 🤖 **Line 306:** `AssertionError` extends `Error`, not `Exception`, so if `checkWrittenData(expected, 1)` fails inside `t1`, the catch block won't catch it — the thread will die silently and `t1.join()` will return normally, leaving the test passing despite the failed assertion. This may be why `TestWriteMergeOnReadWithCompact.testBlockedInstantTimeRequest` documents the test as flaky. Could you catch `Throwable` and store it in an `AtomicReference` for the main thread to rethrow after `join()`? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java: ########## Review Comment: 🤖 **Line 185:** nit: `writeConfig` is assigned here but never used after this point — it looks like a leftover from the `CkpMetadata` replacement. Could you remove it (or actually use it if something depends on this call)? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java: ########## Review Comment: 🤖 **Line 556:** nit: `assertDataFilesExists` — the subject "files" is plural, so it might read more naturally as `assertDataFilesExist` (and the sibling `assertEmptyBaseFiles` already follows this pattern implicitly). <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CommitGuard.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.exception.HoodieException; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * The commit guard used for blocking instant time generation. + * + * <p>A new instant time can not be generated until all the pending instants are committed. + */ +public class CommitGuard { + /** + * A lock used to block instant generation until being signaled after pending instant is committed. + */ + private final Lock lock; + private final Condition condition; + private final long commitAckTimeout; + + private CommitGuard(long commitAckTimeout) { + this.lock = new ReentrantLock(); + this.condition = lock.newCondition(); + this.commitAckTimeout = commitAckTimeout; + } + + public static CommitGuard create(long commitAckTimeout) { + return new CommitGuard(commitAckTimeout); + } + + /** + * Wait until all the pending instants are committed. + * + * @param instants Current pending instants + */ + public void blockFor(String instants) { + lock.lock(); + try { + if (!condition.await(commitAckTimeout, TimeUnit.MILLISECONDS)) { + throw new HoodieException("Timeout(" + commitAckTimeout + "ms) while waiting for instants [" + instants + "] to commit"); + } Review Comment: 🤖 This `await` isn't guarded by a predicate, which exposes two issues: (1) if `unblock()` runs before another thread enters `await()`, the signal is lost and the waiter blocks for the full `commitAckTimeout`; (2) every `EventBuffers.reset(...)` calls `unblock()`, so if multiple checkpoints are pending, the first commit will wake the waiter and `blockFor` returns even though there are still pending instants. For CoW upsert (where `isBlockingInstantGeneration` is true), that breaks the intended serialization across instants. Could we loop while `pendingInstants` is non-empty (similar to the supplier-based pattern), so the awaiter only proceeds when all pending instants are actually drained? @danny0405 <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/ExplicitClassloaderThreadFactory.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.adapter.Utils; + +import java.util.concurrent.ThreadFactory; + +/** + * A thread factory with explicit classloader. + */ +public class ExplicitClassloaderThreadFactory implements ThreadFactory { + private final String threadName; + private final ClassLoader cl; + private final Thread.UncaughtExceptionHandler errorHandler; + + private Thread t; Review Comment: 🤖 nit: `t` is a pretty opaque name for the field tracking the single created thread — `thread` (or `singleThread`) would make the `if (t != null)` guard in `newThread` immediately readable. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/keyedstate/MockKeyedValueState.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils.keyedstate; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.hudi.sink.utils.MockValueState; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Mock keyed list state for testing. + * @param <V> Type of state value Review Comment: 🤖 nit: the Javadoc here says "Mock keyed **list** state for testing" — looks like a copy-paste from `MockKeyedListState`. Could you update it to "Mock keyed value state for testing"? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java: ########## @@ -108,6 +111,14 @@ public void setTaskID(int taskID) { this.taskID = taskID; } + public Long getCheckpointId() { Review Comment: 🤖 The field is primitive `long` but the getter returns boxed `Long`. This works via autoboxing but allocates and can surprise callers who do `event.getCheckpointId() == someValue` (reference equality on Long). Could we make this `public long getCheckpointId()` for consistency with the field type and the setter signature? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
