ferenc-csaky commented on code in PR #33:
URL: 
https://github.com/apache/flink-connector-hbase/pull/33#discussion_r1394135077


##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java:
##########
@@ -83,4 +84,38 @@ public static TimestampMetadata of(List<String> 
metadataKeys, DataType physicalD
                     pos + 
physicalDataType.getLogicalType().getChildren().size());
         }
     }
+
+    /** Time-to-live metadata for HBase. */
+    public static class TimeToLiveMetadata extends WritableMetadata<Long> {
+
+        public static final String KEY = "ttl";
+        public static final DataType DATA_TYPE = DataTypes.BIGINT().nullable();
+
+        private final int pos;
+
+        public TimeToLiveMetadata(int pos) {
+            this.pos = pos;
+        }
+
+        @Override
+        public Long read(RowData row) {
+            if (pos < 0) {
+                return null;
+            }
+            if (row.isNullAt(pos)) {
+                throw new IllegalArgumentException(
+                        String.format("Writable metadata '%s' can not accept 
null value", KEY));
+            }
+            return row.getLong(pos);
+        }
+
+        public static TimeToLiveMetadata of(List<String> metadataKeys, 
DataType physicalDataType) {

Review Comment:
   nit: I am not against static factory functions at all but in this particular 
case we could simply have one constructor that takes these 2 args, e.g.:
   ```java
   public TimeToLiveMetadata(List<String> metadataKeys, DataType 
physicalDataType) {
       int idx = metadataKeys.indexOf(KEY);
       pos = idx < 0 ? -1 : idx + 
physicalDataType.getLogicalType().getChildren().size();
   }
   ```
   Because at the moment the `TimeToLiveMetadata(int)` ctor is only used in the 
static factory, so we could spare some code. The same is true for 
`TimestampMetadata`.
   
   But feel free to ignore this comment if you prefer it this way.



##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java:
##########
@@ -83,4 +84,38 @@ public static TimestampMetadata of(List<String> 
metadataKeys, DataType physicalD
                     pos + 
physicalDataType.getLogicalType().getChildren().size());
         }
     }
+
+    /** Time-to-live metadata for HBase. */
+    public static class TimeToLiveMetadata extends WritableMetadata<Long> {
+
+        public static final String KEY = "ttl";
+        public static final DataType DATA_TYPE = DataTypes.BIGINT().nullable();
+
+        private final int pos;
+
+        public TimeToLiveMetadata(int pos) {
+            this.pos = pos;
+        }
+
+        @Override
+        public Long read(RowData row) {
+            if (pos < 0) {
+                return null;
+            }
+            if (row.isNullAt(pos)) {
+                throw new IllegalArgumentException(
+                        String.format("Writable metadata '%s' can not accept 
null value", KEY));
+            }

Review Comment:
   Now this `null` check is basically duplicated and probably will be required 
for any new metadata type we might have in the future. I think we could 
separate this into a `private static` method in `WritableMetadata`, something 
like `validateNotNull(RowData row, int pos, String key)`.



##########
flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java:
##########
@@ -451,6 +452,60 @@ public void testTableSinkWithTimestampMetadata() throws 
Exception {
         TestBaseUtils.compareResultAsText(results, expected);
     }
 
+    @Test
+    public void testTableSinkWithTTLMetadata() throws Exception {
+        StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, 
streamSettings);
+
+        tEnv.executeSql(
+                "CREATE TABLE hTableForSink ("
+                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+                        + " family1 ROW<col1 INT>,"
+                        + " ttl BIGINT NOT NULL METADATA FROM 'ttl'"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-2.2',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_6
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+
+        String insert =
+                "INSERT INTO hTableForSink VALUES"
+                        + "(1, ROW(1), 4000),"
+                        + "(2, ROW(2), 3500),"
+                        + "(3, ROW(3), 5000)";
+        tEnv.executeSql(insert).await();
+
+        tEnv.executeSql(
+                "CREATE TABLE hTableForQuery ("
+                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+                        + " family1 ROW<col1 INT>"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-2.2',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_6
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+        String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";
+
+        TableResult firstResult = tEnv.executeSql(query);
+        List<Row> firstResults = 
CollectionUtil.iteratorToList(firstResult.collect());
+        String firstExpected = "+I[1, 1]\n+I[2, 2]\n+I[3, 3]\n";
+        TestBaseUtils.compareResultAsText(firstResults, firstExpected);
+
+        TimeUnit.SECONDS.sleep(6);
+
+        TableResult lastResult = tEnv.executeSql(query);
+        List<Row> lastResults = 
CollectionUtil.iteratorToList(lastResult.collect());
+        assertThat(lastResults).isEmpty();

Review Comment:
   (Same for the hbase-1.4 test)
   
   I think it would worth to have another read at a time where there is still 
some records in the table, e.g.:
   ```java
   TableResult tRes = tEnv.executeSql(query);
   List<Row> actualRows = CollectionUtil.iteratorToList(tRes.collect());
   String expectedRows = "+I[1, 1]\n+I[2, 2]\n+I[3, 3]\n";
   TestBaseUtils.compareResultAsText(actualRows, expectedRows);
   
   Thread.sleep(4200);
   
   tRes = tEnv.executeSql(query);
   actualRows = CollectionUtil.iteratorToList(tRes.collect());
   expectedRows = "+I[3, 3]";
   TestBaseUtils.compareResultAsText(actualRows, expectedRows);
   
   Thread.sleep(2000);
   
   tRes = tEnv.executeSql(query);
   actualRows = CollectionUtil.iteratorToList(tRes.collect());
   assertThat(actualRows).isEmpty();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to