This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new bb842561e [client] move OffsetsInitializer to fluss-client (#2424)
bb842561e is described below

commit bb842561e458fa4a37114b4ac36f6f7309a1f9a8
Author: Yann Byron <[email protected]>
AuthorDate: Wed Jan 21 15:55:31 2026 +0800

    [client] move OffsetsInitializer to fluss-client (#2424)
---
 .../client}/initializer/BucketOffsetsRetrieverImpl.java    | 14 ++++++--------
 .../client}/initializer/EarliestOffsetsInitializer.java    |  9 ++-------
 .../client}/initializer/LatestOffsetsInitializer.java      |  2 +-
 .../client}/initializer/NoStoppingOffsetsInitializer.java  |  4 ++--
 .../fluss/client}/initializer/OffsetsInitializer.java      |  5 ++---
 .../client}/initializer/SnapshotOffsetsInitializer.java    |  2 +-
 .../client}/initializer/TimestampOffsetsInitializer.java   |  2 +-
 .../apache/fluss/client/table/scanner/log/LogScanner.java  |  6 ++++++
 .../org/apache/fluss/flink/lake/LakeSplitGenerator.java    |  2 +-
 .../java/org/apache/fluss/flink/source/FlinkSource.java    |  2 +-
 .../org/apache/fluss/flink/source/FlinkTableSource.java    |  2 +-
 .../java/org/apache/fluss/flink/source/FlussSource.java    |  2 +-
 .../org/apache/fluss/flink/source/FlussSourceBuilder.java  |  2 +-
 .../flink/source/enumerator/FlinkSourceEnumerator.java     | 10 +++++-----
 .../java/org/apache/fluss/flink/source/split/LogSplit.java |  3 ++-
 .../flink/tiering/source/split/TieringSplitGenerator.java  |  4 ++--
 .../apache/fluss/flink/source/FlussSourceBuilderTest.java  |  2 +-
 .../org/apache/fluss/flink/source/FlussSourceITCase.java   |  2 +-
 .../flink/source/enumerator/FlinkSourceEnumeratorTest.java |  2 +-
 19 files changed, 38 insertions(+), 39 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/BucketOffsetsRetrieverImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java
similarity index 88%
rename from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/BucketOffsetsRetrieverImpl.java
rename to 
fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java
index b141a25a5..e868a84cc 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/BucketOffsetsRetrieverImpl.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java
@@ -15,16 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.flink.source.enumerator.initializer;
+package org.apache.fluss.client.initializer;
 
 import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.client.admin.ListOffsetsResult;
 import org.apache.fluss.client.admin.OffsetSpec;
-import 
org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer.BucketOffsetsRetriever;
+import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.metadata.TablePath;
-
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.fluss.utils.ExceptionUtils;
 
 import javax.annotation.Nullable;
 
@@ -36,7 +34,7 @@ import java.util.concurrent.ExecutionException;
 import static 
org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
 
 /** The default implementation for offsets retriever. */
-public class BucketOffsetsRetrieverImpl implements BucketOffsetsRetriever {
+public class BucketOffsetsRetrieverImpl implements 
OffsetsInitializer.BucketOffsetsRetriever {
     private final Admin flussAdmin;
     private final TablePath tablePath;
 
@@ -79,10 +77,10 @@ public class BucketOffsetsRetrieverImpl implements 
BucketOffsetsRetriever {
             return result.all().get();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            throw new FlinkRuntimeException(
+            throw new FlussRuntimeException(
                     "Interrupted while listing offsets for table buckets: " + 
buckets, e);
         } catch (ExecutionException e) {
-            throw new FlinkRuntimeException(
+            throw new FlussRuntimeException(
                     "Failed to list offsets for table buckets: " + buckets + " 
due to",
                     ExceptionUtils.stripExecutionException(e));
         }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/EarliestOffsetsInitializer.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/EarliestOffsetsInitializer.java
similarity index 83%
rename from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/EarliestOffsetsInitializer.java
rename to 
fluss-client/src/main/java/org/apache/fluss/client/initializer/EarliestOffsetsInitializer.java
index 144f9dd5c..d43cdbc3d 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/EarliestOffsetsInitializer.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/EarliestOffsetsInitializer.java
@@ -15,10 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.flink.source.enumerator.initializer;
-
-import org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator;
-import org.apache.fluss.flink.source.reader.FlinkSourceSplitReader;
+package org.apache.fluss.client.initializer;
 
 import javax.annotation.Nullable;
 
@@ -29,9 +26,7 @@ import java.util.Map;
 import static 
org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
 
 /**
- * A initializer that initialize the buckets to the earliest offsets. The 
offsets initialization are
- * taken care of by the {@link FlinkSourceSplitReader} instead of by the {@link
- * FlinkSourceEnumerator}.
+ * A initializer that initialize the buckets to the earliest offsets.
  *
  * <p>Package private and should be instantiated via {@link 
OffsetsInitializer}.
  */
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/LatestOffsetsInitializer.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/LatestOffsetsInitializer.java
similarity index 96%
rename from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/LatestOffsetsInitializer.java
rename to 
fluss-client/src/main/java/org/apache/fluss/client/initializer/LatestOffsetsInitializer.java
index 71b96d84c..df8d3ca75 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/LatestOffsetsInitializer.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/LatestOffsetsInitializer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.flink.source.enumerator.initializer;
+package org.apache.fluss.client.initializer;
 
 import javax.annotation.Nullable;
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/NoStoppingOffsetsInitializer.java
similarity index 91%
rename from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
rename to 
fluss-client/src/main/java/org/apache/fluss/client/initializer/NoStoppingOffsetsInitializer.java
index 49f0d5ff7..3a2f54a0d 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/NoStoppingOffsetsInitializer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.flink.source.enumerator.initializer;
+package org.apache.fluss.client.initializer;
 
 import javax.annotation.Nullable;
 
@@ -23,7 +23,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.fluss.flink.source.split.LogSplit.NO_STOPPING_OFFSET;
+import static 
org.apache.fluss.client.table.scanner.log.LogScanner.NO_STOPPING_OFFSET;
 
 /**
  * An implementation of {@link OffsetsInitializer} which does not initialize 
anything.
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/OffsetsInitializer.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/OffsetsInitializer.java
similarity index 95%
rename from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/OffsetsInitializer.java
rename to 
fluss-client/src/main/java/org/apache/fluss/client/initializer/OffsetsInitializer.java
index 8184f02a7..bd33a25ef 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/OffsetsInitializer.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/OffsetsInitializer.java
@@ -15,11 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.flink.source.enumerator.initializer;
+package org.apache.fluss.client.initializer;
 
 import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.client.admin.OffsetSpec;
-import org.apache.fluss.flink.source.split.SourceSplitBase;
 import org.apache.fluss.metadata.TablePath;
 
 import javax.annotation.Nullable;
@@ -28,7 +27,7 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
 
-/** An interface for users to specify the starting offset of a {@link 
SourceSplitBase}. */
+/** An interface for users to specify the starting offset. */
 public interface OffsetsInitializer extends Serializable {
 
     /**
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/SnapshotOffsetsInitializer.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/SnapshotOffsetsInitializer.java
similarity index 96%
rename from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/SnapshotOffsetsInitializer.java
rename to 
fluss-client/src/main/java/org/apache/fluss/client/initializer/SnapshotOffsetsInitializer.java
index 916b84177..1651d75f2 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/SnapshotOffsetsInitializer.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/SnapshotOffsetsInitializer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.flink.source.enumerator.initializer;
+package org.apache.fluss.client.initializer;
 
 import javax.annotation.Nullable;
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/TimestampOffsetsInitializer.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/TimestampOffsetsInitializer.java
similarity index 96%
rename from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/TimestampOffsetsInitializer.java
rename to 
fluss-client/src/main/java/org/apache/fluss/client/initializer/TimestampOffsetsInitializer.java
index 11c4cf90b..5a21c68ec 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/TimestampOffsetsInitializer.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/initializer/TimestampOffsetsInitializer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.flink.source.enumerator.initializer;
+package org.apache.fluss.client.initializer;
 
 import javax.annotation.Nullable;
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
index e8d9ba6a0..04357f598 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
@@ -35,6 +35,12 @@ public interface LogScanner extends AutoCloseable {
      */
     long EARLIEST_OFFSET = -2L;
 
+    /**
+     * The latest offset to fetch to. Fluss uses this to indicate the default 
stopping offset for
+     * unbounded Fluss sources.
+     */
+    long NO_STOPPING_OFFSET = Long.MIN_VALUE;
+
     /**
      * Poll log data from tablet server.
      *
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
index 42038cd74..86384226d 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
@@ -18,11 +18,11 @@
 package org.apache.fluss.flink.lake;
 
 import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
 import org.apache.fluss.client.metadata.LakeSnapshot;
 import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
 import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.fluss.flink.source.split.LogSplit;
 import org.apache.fluss.flink.source.split.SourceSplitBase;
 import org.apache.fluss.lake.source.LakeSource;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
index ba880ff91..768fa7dda 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
@@ -17,12 +17,12 @@
 
 package org.apache.fluss.flink.source;
 
+import org.apache.fluss.client.initializer.OffsetsInitializer;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.source.deserializer.DeserializerInitContextImpl;
 import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
 import org.apache.fluss.flink.source.emitter.FlinkRecordEmitter;
 import org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics;
 import org.apache.fluss.flink.source.reader.FlinkSourceReader;
 import org.apache.fluss.flink.source.reader.RecordAndPos;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 45ce7945a..bbfbff402 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -17,11 +17,11 @@
 
 package org.apache.fluss.flink.source;
 
+import org.apache.fluss.client.initializer.OffsetsInitializer;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.FlinkConnectorOptions;
 import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.fluss.flink.source.lookup.FlinkAsyncLookupFunction;
 import org.apache.fluss.flink.source.lookup.FlinkLookupFunction;
 import org.apache.fluss.flink.source.lookup.LookupNormalizer;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
index 427741834..05fe4e276 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
@@ -18,9 +18,9 @@
 package org.apache.fluss.flink.source;
 
 import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.types.RowType;
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
index f16639753..afd955c01 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
@@ -20,11 +20,11 @@ package org.apache.fluss.flink.source;
 import org.apache.fluss.client.Connection;
 import org.apache.fluss.client.ConnectionFactory;
 import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.FlinkConnectorOptions;
 import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.types.RowType;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index 9df9502ff..a22c61d71 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -20,17 +20,17 @@ package org.apache.fluss.flink.source.enumerator;
 import org.apache.fluss.client.Connection;
 import org.apache.fluss.client.ConnectionFactory;
 import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.initializer.BucketOffsetsRetrieverImpl;
+import org.apache.fluss.client.initializer.NoStoppingOffsetsInitializer;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
+import 
org.apache.fluss.client.initializer.OffsetsInitializer.BucketOffsetsRetriever;
+import org.apache.fluss.client.initializer.SnapshotOffsetsInitializer;
 import org.apache.fluss.client.metadata.KvSnapshots;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.lake.LakeSplitGenerator;
 import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
-import 
org.apache.fluss.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl;
-import 
org.apache.fluss.flink.source.enumerator.initializer.NoStoppingOffsetsInitializer;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
-import 
org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer.BucketOffsetsRetriever;
-import 
org.apache.fluss.flink.source.enumerator.initializer.SnapshotOffsetsInitializer;
 import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
 import org.apache.fluss.flink.source.event.PartitionsRemovedEvent;
 import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/LogSplit.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/LogSplit.java
index cefdc9702..497e46ad5 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/LogSplit.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/LogSplit.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.flink.source.split;
 
+import org.apache.fluss.client.table.scanner.log.LogScanner;
 import org.apache.fluss.metadata.TableBucket;
 
 import javax.annotation.Nullable;
@@ -27,7 +28,7 @@ import java.util.Optional;
 /** The split for log. It's used to describe the log data of a table bucket. */
 public class LogSplit extends SourceSplitBase {
 
-    public static final long NO_STOPPING_OFFSET = Long.MIN_VALUE;
+    public static final long NO_STOPPING_OFFSET = 
LogScanner.NO_STOPPING_OFFSET;
 
     private static final String LOG_SPLIT_PREFIX = "log-";
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
index eb207f8ea..373118a97 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
@@ -18,11 +18,11 @@
 package org.apache.fluss.flink.tiering.source.split;
 
 import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.initializer.BucketOffsetsRetrieverImpl;
+import 
org.apache.fluss.client.initializer.OffsetsInitializer.BucketOffsetsRetriever;
 import org.apache.fluss.client.metadata.KvSnapshots;
 import org.apache.fluss.client.metadata.LakeSnapshot;
 import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
-import 
org.apache.fluss.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl;
-import 
org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer.BucketOffsetsRetriever;
 import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
index 5091d29d4..0f54fc04b 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
@@ -17,9 +17,9 @@
 
 package org.apache.fluss.flink.source;
 
+import org.apache.fluss.client.initializer.OffsetsInitializer;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.fluss.flink.utils.FlinkTestBase;
 import org.apache.fluss.record.LogRecord;
 import org.apache.fluss.row.InternalRow;
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceITCase.java
index 009845500..9938fcce5 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceITCase.java
@@ -17,11 +17,11 @@
 
 package org.apache.fluss.flink.source;
 
+import org.apache.fluss.client.initializer.OffsetsInitializer;
 import org.apache.fluss.client.table.Table;
 import org.apache.fluss.client.table.writer.AppendWriter;
 import org.apache.fluss.client.table.writer.UpsertWriter;
 import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.fluss.flink.source.testutils.MockDataUtils;
 import org.apache.fluss.flink.source.testutils.Order;
 import org.apache.fluss.flink.source.testutils.OrderPartial;
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index 0e9260a54..b18adfc45 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.flink.source.enumerator;
 
+import org.apache.fluss.client.initializer.OffsetsInitializer;
 import org.apache.fluss.client.table.Table;
 import org.apache.fluss.client.table.writer.UpsertWriter;
 import org.apache.fluss.client.write.HashBucketAssigner;
@@ -24,7 +25,6 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.FlinkConnectorOptions;
 import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
-import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
 import org.apache.fluss.flink.source.event.PartitionsRemovedEvent;
 import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit;

Reply via email to