This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new bb842561e [client] move OffsetsInitializer to fluss-client (#2424)
bb842561e is described below
commit bb842561e458fa4a37114b4ac36f6f7309a1f9a8
Author: Yann Byron <[email protected]>
AuthorDate: Wed Jan 21 15:55:31 2026 +0800
[client] move OffsetsInitializer to fluss-client (#2424)
---
.../client}/initializer/BucketOffsetsRetrieverImpl.java | 14 ++++++--------
.../client}/initializer/EarliestOffsetsInitializer.java | 9 ++-------
.../client}/initializer/LatestOffsetsInitializer.java | 2 +-
.../client}/initializer/NoStoppingOffsetsInitializer.java | 4 ++--
.../fluss/client}/initializer/OffsetsInitializer.java | 5 ++---
.../client}/initializer/SnapshotOffsetsInitializer.java | 2 +-
.../client}/initializer/TimestampOffsetsInitializer.java | 2 +-
.../apache/fluss/client/table/scanner/log/LogScanner.java | 6 ++++++
.../org/apache/fluss/flink/lake/LakeSplitGenerator.java | 2 +-
.../java/org/apache/fluss/flink/source/FlinkSource.java | 2 +-
.../org/apache/fluss/flink/source/FlinkTableSource.java | 2 +-
.../java/org/apache/fluss/flink/source/FlussSource.java | 2 +-
.../org/apache/fluss/flink/source/FlussSourceBuilder.java | 2 +-
.../flink/source/enumerator/FlinkSourceEnumerator.java | 10 +++++-----
.../java/org/apache/fluss/flink/source/split/LogSplit.java | 3 ++-
.../flink/tiering/source/split/TieringSplitGenerator.java | 4 ++--
.../apache/fluss/flink/source/FlussSourceBuilderTest.java | 2 +-
.../org/apache/fluss/flink/source/FlussSourceITCase.java | 2 +-
.../flink/source/enumerator/FlinkSourceEnumeratorTest.java | 2 +-
19 files changed, 38 insertions(+), 39 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/BucketOffsetsRetrieverImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java
similarity index 88%
rename from
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/BucketOffsetsRetrieverImpl.java
rename to
fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java
index b141a25a5..e868a84cc 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/BucketOffsetsRetrieverImpl.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java
@@ -15,16 +15,14 @@
* limitations under the License.
*/
-package org.apache.fluss.flink.source.enumerator.initializer;
+package org.apache.fluss.client.initializer;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.admin.ListOffsetsResult;
import org.apache.fluss.client.admin.OffsetSpec;
-import
org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer.BucketOffsetsRetriever;
+import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.metadata.TablePath;
-
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.fluss.utils.ExceptionUtils;
import javax.annotation.Nullable;
@@ -36,7 +34,7 @@ import java.util.concurrent.ExecutionException;
import static
org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
/** The default implementation for offsets retriever. */
-public class BucketOffsetsRetrieverImpl implements BucketOffsetsRetriever {
+public class BucketOffsetsRetrieverImpl implements
OffsetsInitializer.BucketOffsetsRetriever {
private final Admin flussAdmin;
private final TablePath tablePath;
@@ -79,10 +77,10 @@ public class BucketOffsetsRetrieverImpl implements
BucketOffsetsRetriever {
return result.all().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new FlinkRuntimeException(
+ throw new FlussRuntimeException(
"Interrupted while listing offsets for table buckets: " +
buckets, e);
} catch (ExecutionException e) {
- throw new FlinkRuntimeException(
+ throw new FlussRuntimeException(
"Failed to list offsets for table buckets: " + buckets + "
due to",
ExceptionUtils.stripExecutionException(e));
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/EarliestOffsetsInitializer.java
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/EarliestOffsetsInitializer.java
similarity index 83%
rename from
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/EarliestOffsetsInitializer.java
rename to
fluss-client/src/main/java/org/apache/fluss/client/initializer/EarliestOffsetsInitializer.java
index 144f9dd5c..d43cdbc3d 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/EarliestOffsetsInitializer.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/EarliestOffsetsInitializer.java
@@ -15,10 +15,7 @@
* limitations under the License.
*/
-package org.apache.fluss.flink.source.enumerator.initializer;
-
-import org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator;
-import org.apache.fluss.flink.source.reader.FlinkSourceSplitReader;
+package org.apache.fluss.client.initializer;
import javax.annotation.Nullable;
@@ -29,9 +26,7 @@ import java.util.Map;
import static
org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
/**
- * A initializer that initialize the buckets to the earliest offsets. The
offsets initialization are
- * taken care of by the {@link FlinkSourceSplitReader} instead of by the {@link
- * FlinkSourceEnumerator}.
+ * A initializer that initialize the buckets to the earliest offsets.
*
* <p>Package private and should be instantiated via {@link
OffsetsInitializer}.
*/
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/LatestOffsetsInitializer.java
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/LatestOffsetsInitializer.java
similarity index 96%
rename from
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/LatestOffsetsInitializer.java
rename to
fluss-client/src/main/java/org/apache/fluss/client/initializer/LatestOffsetsInitializer.java
index 71b96d84c..df8d3ca75 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/LatestOffsetsInitializer.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/LatestOffsetsInitializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.fluss.flink.source.enumerator.initializer;
+package org.apache.fluss.client.initializer;
import javax.annotation.Nullable;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/NoStoppingOffsetsInitializer.java
similarity index 91%
rename from
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
rename to
fluss-client/src/main/java/org/apache/fluss/client/initializer/NoStoppingOffsetsInitializer.java
index 49f0d5ff7..3a2f54a0d 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/NoStoppingOffsetsInitializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.fluss.flink.source.enumerator.initializer;
+package org.apache.fluss.client.initializer;
import javax.annotation.Nullable;
@@ -23,7 +23,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.fluss.flink.source.split.LogSplit.NO_STOPPING_OFFSET;
+import static
org.apache.fluss.client.table.scanner.log.LogScanner.NO_STOPPING_OFFSET;
/**
* An implementation of {@link OffsetsInitializer} which does not initialize
anything.
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/OffsetsInitializer.java
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/OffsetsInitializer.java
similarity index 95%
rename from
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/OffsetsInitializer.java
rename to
fluss-client/src/main/java/org/apache/fluss/client/initializer/OffsetsInitializer.java
index 8184f02a7..bd33a25ef 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/OffsetsInitializer.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/OffsetsInitializer.java
@@ -15,11 +15,10 @@
* limitations under the License.
*/
-package org.apache.fluss.flink.source.enumerator.initializer;
+package org.apache.fluss.client.initializer;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.admin.OffsetSpec;
-import org.apache.fluss.flink.source.split.SourceSplitBase;
import org.apache.fluss.metadata.TablePath;
import javax.annotation.Nullable;
@@ -28,7 +27,7 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
-/** An interface for users to specify the starting offset of a {@link
SourceSplitBase}. */
+/** An interface for users to specify the starting offset. */
public interface OffsetsInitializer extends Serializable {
/**
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/SnapshotOffsetsInitializer.java
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/SnapshotOffsetsInitializer.java
similarity index 96%
rename from
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/SnapshotOffsetsInitializer.java
rename to
fluss-client/src/main/java/org/apache/fluss/client/initializer/SnapshotOffsetsInitializer.java
index 916b84177..1651d75f2 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/SnapshotOffsetsInitializer.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/SnapshotOffsetsInitializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.fluss.flink.source.enumerator.initializer;
+package org.apache.fluss.client.initializer;
import javax.annotation.Nullable;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/TimestampOffsetsInitializer.java
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/TimestampOffsetsInitializer.java
similarity index 96%
rename from
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/TimestampOffsetsInitializer.java
rename to
fluss-client/src/main/java/org/apache/fluss/client/initializer/TimestampOffsetsInitializer.java
index 11c4cf90b..5a21c68ec 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/TimestampOffsetsInitializer.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/TimestampOffsetsInitializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.fluss.flink.source.enumerator.initializer;
+package org.apache.fluss.client.initializer;
import javax.annotation.Nullable;
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
index e8d9ba6a0..04357f598 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
@@ -35,6 +35,12 @@ public interface LogScanner extends AutoCloseable {
*/
long EARLIEST_OFFSET = -2L;
+ /**
+ * The latest offset to fetch to. Fluss uses this to indicate the default
stopping offset for
+ * unbounded Fluss sources.
+ */
+ long NO_STOPPING_OFFSET = Long.MIN_VALUE;
+
/**
* Poll log data from tablet server.
*
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
index 42038cd74..86384226d 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
@@ -18,11 +18,11 @@
package org.apache.fluss.flink.lake;
import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
import org.apache.fluss.flink.source.split.LogSplit;
import org.apache.fluss.flink.source.split.SourceSplitBase;
import org.apache.fluss.lake.source.LakeSource;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
index ba880ff91..768fa7dda 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
@@ -17,12 +17,12 @@
package org.apache.fluss.flink.source;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.source.deserializer.DeserializerInitContextImpl;
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
import org.apache.fluss.flink.source.emitter.FlinkRecordEmitter;
import org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics;
import org.apache.fluss.flink.source.reader.FlinkSourceReader;
import org.apache.fluss.flink.source.reader.RecordAndPos;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 45ce7945a..bbfbff402 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -17,11 +17,11 @@
package org.apache.fluss.flink.source;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
import org.apache.fluss.flink.source.lookup.FlinkAsyncLookupFunction;
import org.apache.fluss.flink.source.lookup.FlinkLookupFunction;
import org.apache.fluss.flink.source.lookup.LookupNormalizer;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
index 427741834..05fe4e276 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
@@ -18,9 +18,9 @@
package org.apache.fluss.flink.source;
import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.types.RowType;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
index f16639753..afd955c01 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
@@ -20,11 +20,11 @@ package org.apache.fluss.flink.source;
import org.apache.fluss.client.Connection;
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.types.RowType;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index 9df9502ff..a22c61d71 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -20,17 +20,17 @@ package org.apache.fluss.flink.source.enumerator;
import org.apache.fluss.client.Connection;
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.initializer.BucketOffsetsRetrieverImpl;
+import org.apache.fluss.client.initializer.NoStoppingOffsetsInitializer;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
+import
org.apache.fluss.client.initializer.OffsetsInitializer.BucketOffsetsRetriever;
+import org.apache.fluss.client.initializer.SnapshotOffsetsInitializer;
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.lake.LakeSplitGenerator;
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
-import
org.apache.fluss.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl;
-import
org.apache.fluss.flink.source.enumerator.initializer.NoStoppingOffsetsInitializer;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
-import
org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer.BucketOffsetsRetriever;
-import
org.apache.fluss.flink.source.enumerator.initializer.SnapshotOffsetsInitializer;
import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
import org.apache.fluss.flink.source.event.PartitionsRemovedEvent;
import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/LogSplit.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/LogSplit.java
index cefdc9702..497e46ad5 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/LogSplit.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/LogSplit.java
@@ -17,6 +17,7 @@
package org.apache.fluss.flink.source.split;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
import org.apache.fluss.metadata.TableBucket;
import javax.annotation.Nullable;
@@ -27,7 +28,7 @@ import java.util.Optional;
/** The split for log. It's used to describe the log data of a table bucket. */
public class LogSplit extends SourceSplitBase {
- public static final long NO_STOPPING_OFFSET = Long.MIN_VALUE;
+ public static final long NO_STOPPING_OFFSET =
LogScanner.NO_STOPPING_OFFSET;
private static final String LOG_SPLIT_PREFIX = "log-";
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
index eb207f8ea..373118a97 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
@@ -18,11 +18,11 @@
package org.apache.fluss.flink.tiering.source.split;
import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.initializer.BucketOffsetsRetrieverImpl;
+import
org.apache.fluss.client.initializer.OffsetsInitializer.BucketOffsetsRetriever;
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
-import
org.apache.fluss.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl;
-import
org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer.BucketOffsetsRetriever;
import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
index 5091d29d4..0f54fc04b 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
@@ -17,9 +17,9 @@
package org.apache.fluss.flink.source;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
import org.apache.fluss.flink.utils.FlinkTestBase;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.InternalRow;
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceITCase.java
index 009845500..9938fcce5 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceITCase.java
@@ -17,11 +17,11 @@
package org.apache.fluss.flink.source;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
import org.apache.fluss.client.table.Table;
import org.apache.fluss.client.table.writer.AppendWriter;
import org.apache.fluss.client.table.writer.UpsertWriter;
import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
import org.apache.fluss.flink.source.testutils.MockDataUtils;
import org.apache.fluss.flink.source.testutils.Order;
import org.apache.fluss.flink.source.testutils.OrderPartial;
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index 0e9260a54..b18adfc45 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -17,6 +17,7 @@
package org.apache.fluss.flink.source.enumerator;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
import org.apache.fluss.client.table.Table;
import org.apache.fluss.client.table.writer.UpsertWriter;
import org.apache.fluss.client.write.HashBucketAssigner;
@@ -24,7 +25,6 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
import org.apache.fluss.flink.source.event.PartitionsRemovedEvent;
import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit;