This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d393d2a82f [Fix][Connector-V2][Hbase] Fix source reader only scanning
first split (#10287)
d393d2a82f is described below
commit d393d2a82f2ac39f7a6817c8aa43a363037c4012
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Jan 7 22:04:36 2026 +0800
[Fix][Connector-V2][Hbase] Fix source reader only scanning first split
(#10287)
---
.../seatunnel/hbase/source/HbaseSourceReader.java | 40 ++++---
.../hbase/source/HbaseSourceReaderTest.java | 122 +++++++++++++++++++++
2 files changed, 145 insertions(+), 17 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
index aa64812632..4bf1fec791 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.apache.seatunnel.shade.com.google.common.collect.Maps;
@@ -62,10 +63,22 @@ public class HbaseSourceReader implements
SourceReader<SeaTunnelRow, HbaseSource
private HBaseDeserializationFormat hbaseDeserializationFormat =
new HBaseDeserializationFormat();
- private ResultScanner currentScanner;
public HbaseSourceReader(
HbaseParameters hbaseParameters, Context context, SeaTunnelRowType
seaTunnelRowType) {
+ this(
+ hbaseParameters,
+ context,
+ seaTunnelRowType,
+ HbaseClient.createInstance(hbaseParameters));
+ }
+
+ @VisibleForTesting
+ HbaseSourceReader(
+ HbaseParameters hbaseParameters,
+ Context context,
+ SeaTunnelRowType seaTunnelRowType,
+ HbaseClient hbaseClient) {
this.hbaseParameters = hbaseParameters;
this.context = context;
this.seaTunnelRowType = seaTunnelRowType;
@@ -82,7 +95,7 @@ public class HbaseSourceReader implements
SourceReader<SeaTunnelRow, HbaseSource
Preconditions.checkArgument(
column.contains(":") &&
column.split(":").length == 2,
"Invalid column names, it should be
[ColumnFamily:Column] format"));
- hbaseClient = HbaseClient.createInstance(hbaseParameters);
+ this.hbaseClient = hbaseClient;
}
@Override
@@ -92,13 +105,6 @@ public class HbaseSourceReader implements
SourceReader<SeaTunnelRow, HbaseSource
@Override
public void close() throws IOException {
- if (this.currentScanner != null) {
- try {
- this.currentScanner.close();
- } catch (Exception e) {
- throw new IOException("Failed to close HBase Scanner.", e);
- }
- }
if (this.hbaseClient != null) {
try {
this.hbaseClient.close();
@@ -115,14 +121,14 @@ public class HbaseSourceReader implements
SourceReader<SeaTunnelRow, HbaseSource
final HbaseSourceSplit split = sourceSplits.poll();
if (Objects.nonNull(split)) {
// read logic
- if (currentScanner == null) {
- currentScanner = hbaseClient.scan(split, hbaseParameters,
this.columnNames);
- }
- for (Result result : currentScanner) {
- SeaTunnelRow seaTunnelRow =
- hbaseDeserializationFormat.deserialize(
- convertRawRow(result), seaTunnelRowType);
- output.collect(seaTunnelRow);
+ try (ResultScanner scanner =
+ hbaseClient.scan(split, hbaseParameters,
this.columnNames)) {
+ for (Result result : scanner) {
+ SeaTunnelRow seaTunnelRow =
+ hbaseDeserializationFormat.deserialize(
+ convertRawRow(result),
seaTunnelRowType);
+ output.collect(seaTunnelRow);
+ }
}
} else if (noMoreSplit && sourceSplits.isEmpty()) {
// signal to the source that we have reached the end of the
data.
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReaderTest.java
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReaderTest.java
new file mode 100644
index 0000000000..b98b330193
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReaderTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HbaseSourceReaderTest {
+
+ private static class CountingCollector implements Collector<SeaTunnelRow> {
+ private final Object checkpointLock = new Object();
+ private int count;
+
+ @Override
+ public void collect(SeaTunnelRow record) {
+ count++;
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return checkpointLock;
+ }
+
+ public int getCount() {
+ return count;
+ }
+ }
+
+ @Test
+ void testPollNextReadsAllSplits() throws Exception {
+ HbaseParameters hbaseParameters = mock(HbaseParameters.class);
+ when(hbaseParameters.getTable()).thenReturn("test_table");
+
+ SourceReader.Context readerContext = mock(SourceReader.Context.class);
+ HbaseClient hbaseClient = mock(HbaseClient.class);
+
+ SeaTunnelRowType seaTunnelRowType =
+ new SeaTunnelRowType(
+ new String[] {"rowkey", "cf1:id", "cf1:name"},
+ new SeaTunnelDataType[] {
+ BasicType.STRING_TYPE, BasicType.STRING_TYPE,
BasicType.STRING_TYPE
+ });
+
+ HbaseSourceReader reader =
+ new HbaseSourceReader(
+ hbaseParameters, readerContext, seaTunnelRowType,
hbaseClient);
+
+ HbaseSourceSplit split0 = new HbaseSourceSplit(0, Bytes.toBytes("a"),
Bytes.toBytes("b"));
+ HbaseSourceSplit split1 = new HbaseSourceSplit(1, Bytes.toBytes("b"),
Bytes.toBytes("c"));
+
+ Result result0 = mock(Result.class);
+ when(result0.getRow()).thenReturn(Bytes.toBytes("row0"));
+ when(result0.getValue(any(byte[].class), any(byte[].class)))
+ .thenReturn(Bytes.toBytes("v0"));
+
+ Result result1 = mock(Result.class);
+ when(result1.getRow()).thenReturn(Bytes.toBytes("row1"));
+ when(result1.getValue(any(byte[].class), any(byte[].class)))
+ .thenReturn(Bytes.toBytes("v1"));
+
+ ResultScanner scanner0 = mock(ResultScanner.class);
+
when(scanner0.iterator()).thenReturn(Arrays.asList(result0).iterator());
+ ResultScanner scanner1 = mock(ResultScanner.class);
+
when(scanner1.iterator()).thenReturn(Arrays.asList(result1).iterator());
+
+ when(hbaseClient.scan(eq(split0), eq(hbaseParameters),
anyList())).thenReturn(scanner0);
+ when(hbaseClient.scan(eq(split1), eq(hbaseParameters),
anyList())).thenReturn(scanner1);
+
+ reader.addSplits(Arrays.asList(split0, split1));
+ reader.handleNoMoreSplits();
+
+ CountingCollector collector = new CountingCollector();
+ reader.pollNext(collector);
+ reader.pollNext(collector);
+ reader.pollNext(collector);
+
+ assertEquals(2, collector.getCount());
+ verify(hbaseClient, times(1)).scan(eq(split0), eq(hbaseParameters),
anyList());
+ verify(hbaseClient, times(1)).scan(eq(split1), eq(hbaseParameters),
anyList());
+ verify(scanner0, times(1)).close();
+ verify(scanner1, times(1)).close();
+ verify(readerContext, times(1)).signalNoMoreElement();
+ }
+}