This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
The following commit(s) were added to refs/heads/v3.0 by this push:
new d35b703 [FLINK-30460][Connector/HBase] Support the writable metadata
TTL. This closes #33
d35b703 is described below
commit d35b703974a32cbb3a00d479ac186fcf73003a07
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Thu Nov 16 23:57:00 2023 +0800
[FLINK-30460][Connector/HBase] Support the writable metadata TTL. This
closes #33
(cherry picked from commit 334bd965d299aca2fa597f08bdc89e0275562419)
---
docs/content.zh/docs/connectors/table/hbase.md | 6 ++
docs/content/docs/connectors/table/hbase.md | 6 ++
.../connector/hbase1/HBaseConnectorITCase.java | 69 ++++++++++++++++++++++
.../flink/connector/hbase1/util/HBaseTestBase.java | 9 +++
.../connector/hbase2/HBaseConnectorITCase.java | 69 ++++++++++++++++++++++
.../flink/connector/hbase2/util/HBaseTestBase.java | 9 +++
.../hbase/sink/RowDataToMutationConverter.java | 8 ++-
.../connector/hbase/sink/WritableMetadata.java | 44 ++++++++++----
.../flink/connector/hbase/util/HBaseSerde.java | 5 +-
.../flink/connector/hbase/util/HBaseSerdeTest.java | 4 +-
10 files changed, 212 insertions(+), 17 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/hbase.md
b/docs/content.zh/docs/connectors/table/hbase.md
index db68d37..8782e36 100644
--- a/docs/content.zh/docs/connectors/table/hbase.md
+++ b/docs/content.zh/docs/connectors/table/hbase.md
@@ -99,6 +99,12 @@ ON myTopic.key = hTable.rowkey;
<td>HBase记录的时间戳。</td>
<td><code>W</code></td>
</tr>
+ <tr>
+ <td><code>ttl</code></td>
+ <td><code>BIGINT NOT NULL</code></td>
+ <td>HBase记录的生存时间(毫秒)。</td>
+ <td><code>W</code></td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content/docs/connectors/table/hbase.md
b/docs/content/docs/connectors/table/hbase.md
index 8c723da..85371fa 100644
--- a/docs/content/docs/connectors/table/hbase.md
+++ b/docs/content/docs/connectors/table/hbase.md
@@ -101,6 +101,12 @@ Read-only columns must be declared `VIRTUAL` to exclude
them during an `INSERT I
<td>Timestamp for the HBase mutation.</td>
<td><code>W</code></td>
</tr>
+ <tr>
+ <td><code>ttl</code></td>
+ <td><code>BIGINT NOT NULL</code></td>
+ <td>Time-to-live for the HBase mutation, in milliseconds.</td>
+ <td><code>W</code></td>
+ </tr>
</tbody>
</table>
diff --git
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
index 1c53187..e924d58 100644
---
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
+++
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
@@ -55,6 +55,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.flink.table.api.Expressions.$;
@@ -422,6 +423,74 @@ public class HBaseConnectorITCase extends HBaseTestBase {
TestBaseUtils.compareResultAsText(results, expected);
}
+ @Test
+ public void testTableSinkWithTTLMetadata() throws Exception {
+ StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv,
streamSettings);
+
+ tEnv.executeSql(
+ "CREATE TABLE hTableForSink ("
+ + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ + " family1 ROW<col1 INT>,"
+ + " ttl BIGINT NOT NULL METADATA FROM 'ttl'"
+ + ") WITH ("
+ + " 'connector' = 'hbase-1.4',"
+ + " 'table-name' = '"
+ + TEST_TABLE_6
+ + "',"
+ + " 'zookeeper.quorum' = '"
+ + getZookeeperQuorum()
+ + "'"
+ + ")");
+
+ String insert =
+ "INSERT INTO hTableForSink VALUES"
+ + "(1, ROW(1), 2000),"
+ + "(2, ROW(2), 9000),"
+ + "(3, ROW(3), 5000)";
+ tEnv.executeSql(insert).await();
+
+ tEnv.executeSql(
+ "CREATE TABLE hTableForQuery ("
+ + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ + " family1 ROW<col1 INT>"
+ + ") WITH ("
+ + " 'connector' = 'hbase-1.4',"
+ + " 'table-name' = '"
+ + TEST_TABLE_6
+ + "',"
+ + " 'zookeeper.quorum' = '"
+ + getZookeeperQuorum()
+ + "'"
+ + ")");
+ String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";
+
+ TableResult firstResult = tEnv.executeSql(query);
+ List<Row> firstResults =
CollectionUtil.iteratorToList(firstResult.collect());
+ String firstExpected = "+I[1, 1]\n+I[2, 2]\n+I[3, 3]\n";
+ TestBaseUtils.compareResultAsText(firstResults, firstExpected);
+
+ TimeUnit.SECONDS.sleep(3);
+
+ TableResult secondResult = tEnv.executeSql(query);
+ List<Row> secondResults =
CollectionUtil.iteratorToList(secondResult.collect());
+ String secondExpected = "+I[2, 2]\n+I[3, 3]\n";
+ TestBaseUtils.compareResultAsText(secondResults, secondExpected);
+
+ TimeUnit.SECONDS.sleep(3);
+
+ TableResult thirdResult = tEnv.executeSql(query);
+ List<Row> thirdResults =
CollectionUtil.iteratorToList(thirdResult.collect());
+ String thirdExpected = "+I[2, 2]";
+ TestBaseUtils.compareResultAsText(thirdResults, thirdExpected);
+
+ TimeUnit.SECONDS.sleep(4);
+
+ TableResult lastResult = tEnv.executeSql(query);
+ List<Row> lastResults =
CollectionUtil.iteratorToList(lastResult.collect());
+ assertThat(lastResults).isEmpty();
+ }
+
@Test
public void testTableSourceSinkWithDDL() throws Exception {
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
index c3512a9..05e01e9 100644
---
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
+++
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
@@ -45,6 +45,7 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
protected static final String TEST_TABLE_3 = "testTable3";
protected static final String TEST_TABLE_4 = "testTable4";
protected static final String TEST_TABLE_5 = "testTable5";
+ protected static final String TEST_TABLE_6 = "testTable6";
protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
@@ -98,6 +99,7 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
createHBaseTable3();
createHBaseTable4();
createHBaseTable5();
+ createHBaseTable6();
createEmptyHBaseTable();
}
@@ -253,6 +255,13 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
createTable(tableName, families, SPLIT_KEYS);
}
+ private static void createHBaseTable6() {
+ // create a table
+ byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+ TableName tableName = TableName.valueOf(TEST_TABLE_6);
+ createTable(tableName, families, SPLIT_KEYS);
+ }
+
private static void createEmptyHBaseTable() {
// create a table
byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
diff --git
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index c73bbc3..4bb6f2c 100644
---
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -60,6 +60,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -451,6 +452,74 @@ public class HBaseConnectorITCase extends HBaseTestBase {
TestBaseUtils.compareResultAsText(results, expected);
}
+ @Test
+ public void testTableSinkWithTTLMetadata() throws Exception {
+ StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv,
streamSettings);
+
+ tEnv.executeSql(
+ "CREATE TABLE hTableForSink ("
+ + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ + " family1 ROW<col1 INT>,"
+ + " ttl BIGINT NOT NULL METADATA FROM 'ttl'"
+ + ") WITH ("
+ + " 'connector' = 'hbase-2.2',"
+ + " 'table-name' = '"
+ + TEST_TABLE_6
+ + "',"
+ + " 'zookeeper.quorum' = '"
+ + getZookeeperQuorum()
+ + "'"
+ + ")");
+
+ String insert =
+ "INSERT INTO hTableForSink VALUES"
+ + "(1, ROW(1), 2000),"
+ + "(2, ROW(2), 9000),"
+ + "(3, ROW(3), 5000)";
+ tEnv.executeSql(insert).await();
+
+ tEnv.executeSql(
+ "CREATE TABLE hTableForQuery ("
+ + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ + " family1 ROW<col1 INT>"
+ + ") WITH ("
+ + " 'connector' = 'hbase-2.2',"
+ + " 'table-name' = '"
+ + TEST_TABLE_6
+ + "',"
+ + " 'zookeeper.quorum' = '"
+ + getZookeeperQuorum()
+ + "'"
+ + ")");
+ String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";
+
+ TableResult firstResult = tEnv.executeSql(query);
+ List<Row> firstResults =
CollectionUtil.iteratorToList(firstResult.collect());
+ String firstExpected = "+I[1, 1]\n+I[2, 2]\n+I[3, 3]\n";
+ TestBaseUtils.compareResultAsText(firstResults, firstExpected);
+
+ TimeUnit.SECONDS.sleep(3);
+
+ TableResult secondResult = tEnv.executeSql(query);
+ List<Row> secondResults =
CollectionUtil.iteratorToList(secondResult.collect());
+ String secondExpected = "+I[2, 2]\n+I[3, 3]\n";
+ TestBaseUtils.compareResultAsText(secondResults, secondExpected);
+
+ TimeUnit.SECONDS.sleep(3);
+
+ TableResult thirdResult = tEnv.executeSql(query);
+ List<Row> thirdResults =
CollectionUtil.iteratorToList(thirdResult.collect());
+ String thirdExpected = "+I[2, 2]";
+ TestBaseUtils.compareResultAsText(thirdResults, thirdExpected);
+
+ TimeUnit.SECONDS.sleep(4);
+
+ TableResult lastResult = tEnv.executeSql(query);
+ List<Row> lastResults =
CollectionUtil.iteratorToList(lastResult.collect());
+ assertThat(lastResults).isEmpty();
+ }
+
@Test
public void testTableSourceSinkWithDDL() throws Exception {
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
index d6a0a9e..3434179 100644
---
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
+++
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
@@ -45,6 +45,7 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
protected static final String TEST_TABLE_3 = "testTable3";
protected static final String TEST_TABLE_4 = "testTable4";
protected static final String TEST_TABLE_5 = "testTable5";
+ protected static final String TEST_TABLE_6 = "testTable6";
protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
@@ -98,6 +99,7 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
createHBaseTable3();
createHBaseTable4();
createHBaseTable5();
+ createHBaseTable6();
createEmptyHBaseTable();
}
@@ -253,6 +255,13 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
createTable(tableName, families, SPLIT_KEYS);
}
+ private static void createHBaseTable6() {
+ // create a table
+ byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+ TableName tableName = TableName.valueOf(TEST_TABLE_6);
+ createTable(tableName, families, SPLIT_KEYS);
+ }
+
private static void createEmptyHBaseTable() {
// create a table
byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
diff --git
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
index f9a13c8..5796d5c 100644
---
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
+++
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.hbase.sink;
+import
org.apache.flink.connector.hbase.sink.WritableMetadata.TimeToLiveMetadata;
import
org.apache.flink.connector.hbase.sink.WritableMetadata.TimestampMetadata;
import org.apache.flink.connector.hbase.util.HBaseSerde;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
@@ -40,6 +41,7 @@ public class RowDataToMutationConverter implements
HBaseMutationConverter<RowDat
private final String nullStringLiteral;
private final boolean ignoreNullValue;
private final TimestampMetadata timestampMetadata;
+ private final TimeToLiveMetadata timeToLiveMetadata;
private transient HBaseSerde serde;
public RowDataToMutationConverter(
@@ -51,7 +53,8 @@ public class RowDataToMutationConverter implements
HBaseMutationConverter<RowDat
this.schema = schema;
this.nullStringLiteral = nullStringLiteral;
this.ignoreNullValue = ignoreNullValue;
- this.timestampMetadata = TimestampMetadata.of(metadataKeys,
physicalDataType);
+ this.timestampMetadata = new TimestampMetadata(metadataKeys,
physicalDataType);
+ this.timeToLiveMetadata = new TimeToLiveMetadata(metadataKeys,
physicalDataType);
}
@Override
@@ -62,9 +65,10 @@ public class RowDataToMutationConverter implements
HBaseMutationConverter<RowDat
@Override
public Mutation convertToMutation(RowData record) {
Long timestamp = timestampMetadata.read(record);
+ Long timeToLive = timeToLiveMetadata.read(record);
RowKind kind = record.getRowKind();
if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
- return serde.createPutMutation(record, timestamp);
+ return serde.createPutMutation(record, timestamp, timeToLive);
} else {
return serde.createDeleteMutation(record, timestamp);
}
diff --git
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java
index c7e9e98..9b3ab36 100644
---
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java
+++
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java
@@ -35,6 +35,8 @@ public abstract class WritableMetadata<T> implements
Serializable {
private static final long serialVersionUID = 1L;
+ public abstract T read(RowData row);
+
/**
* Returns the map of metadata keys and their corresponding data types
that can be consumed by
* HBase sink for writing.
@@ -44,10 +46,16 @@ public abstract class WritableMetadata<T> implements
Serializable {
public static Map<String, DataType> list() {
Map<String, DataType> metadataMap = new HashMap<>();
metadataMap.put(TimestampMetadata.KEY, TimestampMetadata.DATA_TYPE);
+ metadataMap.put(TimeToLiveMetadata.KEY, TimeToLiveMetadata.DATA_TYPE);
return Collections.unmodifiableMap(metadataMap);
}
- public abstract T read(RowData row);
+ private static void validateNotNull(RowData row, int pos, String key) {
+ if (row.isNullAt(pos)) {
+ throw new IllegalArgumentException(
+ String.format("Writable metadata '%s' can not accept null
value", key));
+ }
+ }
/** Timestamp metadata for HBase. */
public static class TimestampMetadata extends WritableMetadata<Long> {
@@ -58,8 +66,9 @@ public abstract class WritableMetadata<T> implements
Serializable {
private final int pos;
- public TimestampMetadata(int pos) {
- this.pos = pos;
+ public TimestampMetadata(List<String> metadataKeys, DataType
physicalDataType) {
+ int idx = metadataKeys.indexOf(KEY);
+ this.pos = idx < 0 ? -1 : idx +
physicalDataType.getLogicalType().getChildren().size();
}
@Override
@@ -67,20 +76,31 @@ public abstract class WritableMetadata<T> implements
Serializable {
if (pos < 0) {
return HConstants.LATEST_TIMESTAMP;
}
- if (row.isNullAt(pos)) {
- throw new IllegalArgumentException(
- String.format("Writable metadata '%s' can not accept
null value", KEY));
- }
+ validateNotNull(row, pos, KEY);
return row.getTimestamp(pos, 3).getMillisecond();
}
+ }
+
+ /** Time-to-live metadata for HBase. */
+ public static class TimeToLiveMetadata extends WritableMetadata<Long> {
- public static TimestampMetadata of(List<String> metadataKeys, DataType
physicalDataType) {
- int pos = metadataKeys.indexOf(TimestampMetadata.KEY);
+ public static final String KEY = "ttl";
+ public static final DataType DATA_TYPE = DataTypes.BIGINT().nullable();
+
+ private final int pos;
+
+ public TimeToLiveMetadata(List<String> metadataKeys, DataType
physicalDataType) {
+ int idx = metadataKeys.indexOf(KEY);
+ this.pos = idx < 0 ? -1 : idx +
physicalDataType.getLogicalType().getChildren().size();
+ }
+
+ @Override
+ public Long read(RowData row) {
if (pos < 0) {
- return new TimestampMetadata(-1);
+ return null;
}
- return new TimestampMetadata(
- pos +
physicalDataType.getLogicalType().getChildren().size());
+ validateNotNull(row, pos, KEY);
+ return row.getLong(pos);
}
}
}
diff --git
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index d381033..362dc96 100644
---
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -135,7 +135,7 @@ public class HBaseSerde {
*
* @return The appropriate instance of Put for this use case.
*/
- public @Nullable Put createPutMutation(RowData row, long timestamp) {
+ public @Nullable Put createPutMutation(RowData row, long timestamp,
@Nullable Long timeToLive) {
checkArgument(keyEncoder != null, "row key is not set.");
byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
if (rowkey.length == 0) {
@@ -144,6 +144,9 @@ public class HBaseSerde {
}
// upsert
Put put = new Put(rowkey, timestamp);
+ if (timeToLive != null) {
+ put.setTTL(timeToLive);
+ }
for (int i = 0; i < fieldLength; i++) {
if (i != rowkeyIndex) {
int f = i > rowkeyIndex ? i - 1 : i;
diff --git
a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
index 85de7c8..7c46d04 100644
---
a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
+++
b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
@@ -106,7 +106,7 @@ class HBaseSerdeTest {
@Test
public void writeIgnoreNullValueTest() {
HBaseSerde serde = createHBaseSerde(false);
- Put m1 = serde.createPutMutation(prepareRowData(),
HConstants.LATEST_TIMESTAMP);
+ Put m1 = serde.createPutMutation(prepareRowData(),
HConstants.LATEST_TIMESTAMP, null);
assert m1 != null;
assertThat(m1.getRow()).isNotEmpty();
assertThat(m1.get(FAMILY1.getBytes(), F1COL1.getBytes())).isNotEmpty();
@@ -119,7 +119,7 @@ class HBaseSerdeTest {
HBaseSerde writeIgnoreNullValueSerde = createHBaseSerde(true);
Put m2 =
writeIgnoreNullValueSerde.createPutMutation(
- prepareRowData(), HConstants.LATEST_TIMESTAMP);
+ prepareRowData(), HConstants.LATEST_TIMESTAMP, null);
assert m2 != null;
assertThat(m2.getRow()).isNotEmpty();
assertThat(m2.get(FAMILY1.getBytes(), F1COL1.getBytes())).isEmpty();