This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
The following commit(s) were added to refs/heads/v3.0 by this push:
new 3116aa4 [FLINK-33304] Introduce the DeduplicatedMutator to resolve
the mutation write conflict problem. This closes #30
3116aa4 is described below
commit 3116aa4a2f6aa423cd8f03a634aefba4b6d373f2
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Fri Nov 3 23:12:14 2023 +0800
[FLINK-33304] Introduce the DeduplicatedMutator to resolve the mutation
write conflict problem. This closes #30
Co-authored-by: tanjialiang <[email protected]>
(cherry picked from commit e0971c3888db03243b08e5684b7690150276ef2c)
---
.../connector/hbase1/HBaseConnectorITCase.java | 59 ++++++++++++++++++++++
.../flink/connector/hbase1/util/HBaseTestBase.java | 9 ++++
.../connector/hbase2/HBaseConnectorITCase.java | 57 +++++++++++++++++++++
.../flink/connector/hbase2/util/HBaseTestBase.java | 9 ++++
.../connector/hbase/sink/HBaseSinkFunction.java | 48 ++++++++++++++++--
5 files changed, 179 insertions(+), 3 deletions(-)
diff --git
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
index 35be358..0ea0002 100644
---
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
+++
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
@@ -29,10 +29,13 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.CollectionUtil;
import org.apache.hadoop.hbase.TableName;
@@ -41,11 +44,14 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.flink.table.api.Expressions.$;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -280,6 +286,59 @@ public class HBaseConnectorITCase extends HBaseTestBase {
TestBaseUtils.compareResultAsText(results, expected);
}
+ @Test
+ public void testTableSinkWithChangelog() throws Exception {
+ StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv,
streamSettings);
+
+ // register values table for source
+ String dataId =
+ TestValuesTableFactory.registerData(
+ Arrays.asList(
+ Row.ofKind(RowKind.INSERT, 1,
Row.of("Hello1")),
+ Row.ofKind(RowKind.DELETE, 1,
Row.of("Hello2")),
+ Row.ofKind(RowKind.INSERT, 2,
Row.of("Hello1")),
+ Row.ofKind(RowKind.INSERT, 2,
Row.of("Hello2")),
+ Row.ofKind(RowKind.INSERT, 2,
Row.of("Hello3")),
+ Row.ofKind(RowKind.DELETE, 2,
Row.of("Hello3")),
+ Row.ofKind(RowKind.INSERT, 1,
Row.of("Hello3"))));
+ tEnv.executeSql(
+ "CREATE TABLE source_table ("
+ + " rowkey INT,"
+ + " family1 ROW<name STRING>,"
+ + " PRIMARY KEY (rowkey) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'data-id' = '"
+ + dataId
+ + "',"
+ + " 'changelog-mode'='I,UA,UB,D'"
+ + ")");
+
+ // register HBase table for sink
+ tEnv.executeSql(
+ "CREATE TABLE sink_table ("
+ + " rowkey INT,"
+ + " family1 ROW<name STRING>,"
+ + " PRIMARY KEY (rowkey) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'hbase-1.4',"
+ + " 'table-name' = '"
+ + TEST_TABLE_4
+ + "',"
+ + " 'zookeeper.quorum' = '"
+ + getZookeeperQuorum()
+ + "'"
+ + ")");
+
+ tEnv.executeSql("INSERT INTO sink_table SELECT * FROM
source_table").await();
+
+ TableResult result = tEnv.executeSql("SELECT * FROM sink_table");
+
+ List<Row> actual = CollectionUtil.iteratorToList(result.collect());
+ assertThat(actual).isEqualTo(Collections.singletonList(Row.of(1,
Row.of("Hello3"))));
+ }
+
@Test
public void testTableSourceSinkWithDDL() throws Exception {
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
index cc2de79..86110c8 100644
---
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
+++
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
@@ -43,6 +43,7 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
protected static final String TEST_TABLE_1 = "testTable1";
protected static final String TEST_TABLE_2 = "testTable2";
protected static final String TEST_TABLE_3 = "testTable3";
+ protected static final String TEST_TABLE_4 = "testTable4";
protected static final String ROW_KEY = "rowkey";
@@ -92,6 +93,7 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
createHBaseTable1();
createHBaseTable2();
createHBaseTable3();
+ createHBaseTable4();
}
private static void createHBaseTable1() throws IOException {
@@ -232,6 +234,13 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
createTable(tableName, families, SPLIT_KEYS);
}
+ private static void createHBaseTable4() {
+ // create a table
+ byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+ TableName tableName = TableName.valueOf(TEST_TABLE_4);
+ createTable(tableName, families, SPLIT_KEYS);
+ }
+
private static Put putRow(
int rowKey,
int f1c1,
diff --git
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index c00b83a..64a9875 100644
---
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
@@ -47,6 +48,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
@@ -55,6 +58,7 @@ import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.apache.flink.table.api.Expressions.$;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -312,6 +316,59 @@ public class HBaseConnectorITCase extends HBaseTestBase {
TestBaseUtils.compareResultAsText(results, String.join("", expected));
}
+ @Test
+ public void testTableSinkWithChangelog() throws Exception {
+ StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv,
streamSettings);
+
+ // register values table for source
+ String dataId =
+ TestValuesTableFactory.registerData(
+ Arrays.asList(
+ Row.ofKind(RowKind.INSERT, 1,
Row.of("Hello1")),
+ Row.ofKind(RowKind.DELETE, 1,
Row.of("Hello2")),
+ Row.ofKind(RowKind.INSERT, 2,
Row.of("Hello1")),
+ Row.ofKind(RowKind.INSERT, 2,
Row.of("Hello2")),
+ Row.ofKind(RowKind.INSERT, 2,
Row.of("Hello3")),
+ Row.ofKind(RowKind.DELETE, 2,
Row.of("Hello3")),
+ Row.ofKind(RowKind.INSERT, 1,
Row.of("Hello3"))));
+ tEnv.executeSql(
+ "CREATE TABLE source_table ("
+ + " rowkey INT,"
+ + " family1 ROW<name STRING>,"
+ + " PRIMARY KEY (rowkey) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'data-id' = '"
+ + dataId
+ + "',"
+ + " 'changelog-mode'='I,UA,UB,D'"
+ + ")");
+
+ // register HBase table for sink
+ tEnv.executeSql(
+ "CREATE TABLE sink_table ("
+ + " rowkey INT,"
+ + " family1 ROW<name STRING>,"
+ + " PRIMARY KEY (rowkey) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'hbase-2.2',"
+ + " 'table-name' = '"
+ + TEST_TABLE_4
+ + "',"
+ + " 'zookeeper.quorum' = '"
+ + getZookeeperQuorum()
+ + "'"
+ + ")");
+
+ tEnv.executeSql("INSERT INTO sink_table SELECT * FROM
source_table").await();
+
+ TableResult result = tEnv.executeSql("SELECT * FROM sink_table");
+
+ List<Row> actual = CollectionUtil.iteratorToList(result.collect());
+ assertThat(actual).isEqualTo(Collections.singletonList(Row.of(1,
Row.of("Hello3"))));
+ }
+
@Test
public void testTableSourceSinkWithDDL() throws Exception {
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
index 1e639ba..1301ee1 100644
---
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
+++
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
@@ -43,6 +43,7 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
protected static final String TEST_TABLE_1 = "testTable1";
protected static final String TEST_TABLE_2 = "testTable2";
protected static final String TEST_TABLE_3 = "testTable3";
+ protected static final String TEST_TABLE_4 = "testTable4";
protected static final String ROW_KEY = "rowkey";
@@ -92,6 +93,7 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
createHBaseTable1();
createHBaseTable2();
createHBaseTable3();
+ createHBaseTable4();
}
private static void createHBaseTable1() throws IOException {
@@ -232,6 +234,13 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
createTable(tableName, families, SPLIT_KEYS);
}
+ private static void createHBaseTable4() {
+ // create a table
+ byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+ TableName tableName = TableName.valueOf(TEST_TABLE_4);
+ createTable(tableName, families, SPLIT_KEYS);
+ }
+
private static Put putRow(
int rowKey,
int f1c1,
diff --git
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
index 4dd30b3..0c4de1a 100644
---
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
+++
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
@@ -35,11 +35,16 @@ import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -71,7 +76,7 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
private final HBaseMutationConverter<T> mutationConverter;
private transient Connection connection;
- private transient BufferedMutator mutator;
+ private transient DeduplicatedMutator mutator;
private transient ScheduledExecutorService executor;
private transient ScheduledFuture scheduledFuture;
@@ -121,7 +126,9 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
if (bufferFlushMaxSizeInBytes > 0) {
params.writeBufferSize(bufferFlushMaxSizeInBytes);
}
- this.mutator = connection.getBufferedMutator(params);
+ this.mutator =
+ new DeduplicatedMutator(
+ (int) bufferFlushMaxMutations,
connection.getBufferedMutator(params));
if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1)
{
this.executor =
@@ -201,7 +208,7 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
}
private void flush() throws IOException {
- // BufferedMutator is thread-safe
+ // DeduplicatedMutator is thread-safe
mutator.flush();
numPendingRequests.set(0);
checkErrorAndRethrow();
@@ -256,4 +263,39 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, exception);
}
+
+ /**
+ * Thread-safe class, grouped mutations by rows and keep the latest
mutation. For more info, see
+ * <a
href="https://issues.apache.org/jira/browse/HBASE-8626">HBASE-8626</a>.
+ */
+ private static class DeduplicatedMutator {
+
+ private final BufferedMutator mutator;
+ private final Map<ByteBuffer, Mutation> mutations;
+
+ DeduplicatedMutator(int size, BufferedMutator mutator) {
+ this.mutator = mutator;
+ this.mutations = new HashMap<>(size);
+ }
+
+ synchronized void mutate(Mutation current) {
+ ByteBuffer key = ByteBuffer.wrap(current.getRow());
+ Mutation old = mutations.get(key);
+ if (old == null || current.getTimeStamp() >= old.getTimeStamp()) {
+ mutations.put(key, current);
+ }
+ }
+
+ synchronized void flush() throws IOException {
+ mutator.mutate(new ArrayList<>(mutations.values()));
+ mutator.flush();
+ mutations.clear();
+ }
+
+ synchronized void close() throws IOException {
+ mutator.mutate(new ArrayList<>(mutations.values()));
+ mutator.close();
+ mutations.clear();
+ }
+ }
}