This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
The following commit(s) were added to refs/heads/main by this push:
new 298d816 [FLINK-33164] Support write option sink.ignore-null-valus.
This closes #21
298d816 is described below
commit 298d8164495732f59d18c54d4d40b601b6d44f21
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Fri Nov 3 23:51:29 2023 +0800
[FLINK-33164] Support write option sink.ignore-null-valus. This closes #21
Co-authored-by: tanjialiang <[email protected]>
---
.../hbase1/HBase1DynamicTableFactory.java | 3 ++
.../hbase1/sink/HBaseDynamicTableSink.java | 5 +-
.../hbase1/HBaseDynamicTableFactoryTest.java | 12 +++++
.../hbase2/HBase2DynamicTableFactory.java | 5 +-
.../hbase2/sink/HBaseDynamicTableSink.java | 5 +-
.../hbase2/HBaseDynamicTableFactoryTest.java | 12 +++++
.../connector/hbase/options/HBaseWriteOptions.java | 20 +++++++
.../hbase/sink/RowDataToMutationConverter.java | 7 ++-
.../hbase/table/HBaseConnectorOptions.java | 6 +++
.../hbase/table/HBaseConnectorOptionsUtil.java | 2 +
.../flink/connector/hbase/util/HBaseSerde.java | 15 ++++++
.../flink/connector/hbase/util/HBaseSerdeTest.java | 62 ++++++++++++++++++++--
12 files changed, 144 insertions(+), 10 deletions(-)
diff --git
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
index fbc793c..5321bf2 100644
---
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
+++
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
@@ -50,6 +50,7 @@ import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.NULL_
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
+import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
@@ -149,6 +150,7 @@ public class HBase1DynamicTableFactory
set.add(SINK_BUFFER_FLUSH_MAX_SIZE);
set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
set.add(SINK_BUFFER_FLUSH_INTERVAL);
+ set.add(SINK_IGNORE_NULL_VALUE);
set.add(SINK_PARALLELISM);
set.add(LOOKUP_ASYNC);
set.add(LOOKUP_CACHE_MAX_ROWS);
@@ -173,6 +175,7 @@ public class HBase1DynamicTableFactory
SINK_BUFFER_FLUSH_MAX_SIZE,
SINK_BUFFER_FLUSH_MAX_ROWS,
SINK_BUFFER_FLUSH_INTERVAL,
+ SINK_IGNORE_NULL_VALUE,
LOOKUP_CACHE_MAX_ROWS,
LOOKUP_CACHE_TTL,
LOOKUP_MAX_RETRIES)
diff --git
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
index 2b9e87c..0dec937 100644
---
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
+++
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
@@ -61,7 +61,10 @@ public class HBaseDynamicTableSink implements
DynamicTableSink {
new HBaseSinkFunction<>(
tableName,
hbaseConf,
- new RowDataToMutationConverter(hbaseTableSchema,
nullStringLiteral),
+ new RowDataToMutationConverter(
+ hbaseTableSchema,
+ nullStringLiteral,
+ writeOptions.isIgnoreNullValue()),
writeOptions.getBufferFlushMaxSizeInBytes(),
writeOptions.getBufferFlushMaxRows(),
writeOptions.getBufferFlushIntervalMillis());
diff --git
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
index 8a8f1d7..7c4ed8e 100644
---
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
+++
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
@@ -256,6 +256,18 @@ public class HBaseDynamicTableFactoryTest {
assertEquals(expected, actual);
}
+ @Test
+ public void testSinkIgnoreNullValueOptions() {
+ Map<String, String> options = getAllOptions();
+ options.put("sink.ignore-null-value", "true");
+
+ ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY,
STRING()));
+
+ DynamicTableSink sink = createTableSink(schema, options);
+ HBaseWriteOptions actual = ((HBaseDynamicTableSink)
sink).getWriteOptions();
+ assertThat(actual.isIgnoreNullValue()).isTrue();
+ }
+
@Test
public void testParallelismOptions() {
Map<String, String> options = getAllOptions();
diff --git
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
index 8c55137..8a10a1c 100644
---
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
+++
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
@@ -50,6 +50,7 @@ import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.NULL_
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
+import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
@@ -152,6 +153,7 @@ public class HBase2DynamicTableFactory
set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
set.add(SINK_BUFFER_FLUSH_INTERVAL);
set.add(SINK_PARALLELISM);
+ set.add(SINK_IGNORE_NULL_VALUE);
set.add(LOOKUP_ASYNC);
set.add(LOOKUP_CACHE_MAX_ROWS);
set.add(LOOKUP_CACHE_TTL);
@@ -177,7 +179,8 @@ public class HBase2DynamicTableFactory
LOOKUP_MAX_RETRIES,
SINK_BUFFER_FLUSH_MAX_SIZE,
SINK_BUFFER_FLUSH_MAX_ROWS,
- SINK_BUFFER_FLUSH_INTERVAL)
+ SINK_BUFFER_FLUSH_INTERVAL,
+ SINK_IGNORE_NULL_VALUE)
.collect(Collectors.toSet());
}
}
diff --git
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
index 6ea9ba3..299a457 100644
---
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
+++
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
@@ -62,7 +62,10 @@ public class HBaseDynamicTableSink implements
DynamicTableSink {
new HBaseSinkFunction<>(
tableName,
hbaseConf,
- new RowDataToMutationConverter(hbaseTableSchema,
nullStringLiteral),
+ new RowDataToMutationConverter(
+ hbaseTableSchema,
+ nullStringLiteral,
+ writeOptions.isIgnoreNullValue()),
writeOptions.getBufferFlushMaxSizeInBytes(),
writeOptions.getBufferFlushMaxRows(),
writeOptions.getBufferFlushIntervalMillis());
diff --git
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
index a535f0e..baf068a 100644
---
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
+++
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
@@ -260,6 +260,18 @@ public class HBaseDynamicTableFactoryTest {
assertEquals(expected, actual);
}
+ @Test
+ public void testSinkIgnoreNullValueOptions() {
+ Map<String, String> options = getAllOptions();
+ options.put("sink.ignore-null-value", "true");
+
+ ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY,
STRING()));
+
+ DynamicTableSink sink = createTableSink(schema, options);
+ HBaseWriteOptions actual = ((HBaseDynamicTableSink)
sink).getWriteOptions();
+ assertThat(actual.isIgnoreNullValue()).isTrue();
+ }
+
@Test
public void testParallelismOptions() {
Map<String, String> options = getAllOptions();
diff --git
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
index 0ba2f88..9462783 100644
---
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
+++
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
@@ -34,16 +34,19 @@ public class HBaseWriteOptions implements Serializable {
private final long bufferFlushMaxSizeInBytes;
private final long bufferFlushMaxRows;
private final long bufferFlushIntervalMillis;
+ private final boolean ignoreNullValue;
private final Integer parallelism;
private HBaseWriteOptions(
long bufferFlushMaxSizeInBytes,
long bufferFlushMaxMutations,
long bufferFlushIntervalMillis,
+ boolean ignoreNullValue,
Integer parallelism) {
this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
this.bufferFlushMaxRows = bufferFlushMaxMutations;
this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
+ this.ignoreNullValue = ignoreNullValue;
this.parallelism = parallelism;
}
@@ -59,6 +62,10 @@ public class HBaseWriteOptions implements Serializable {
return bufferFlushIntervalMillis;
}
+ public boolean isIgnoreNullValue() {
+ return ignoreNullValue;
+ }
+
public Integer getParallelism() {
return parallelism;
}
@@ -72,6 +79,8 @@ public class HBaseWriteOptions implements Serializable {
+ bufferFlushMaxRows
+ ", bufferFlushIntervalMillis="
+ bufferFlushIntervalMillis
+ + ", ignoreNullValue="
+ + ignoreNullValue
+ ", parallelism="
+ parallelism
+ '}';
@@ -89,6 +98,7 @@ public class HBaseWriteOptions implements Serializable {
return bufferFlushMaxSizeInBytes == that.bufferFlushMaxSizeInBytes
&& bufferFlushMaxRows == that.bufferFlushMaxRows
&& bufferFlushIntervalMillis == that.bufferFlushIntervalMillis
+ && ignoreNullValue == that.ignoreNullValue
&& parallelism == that.parallelism;
}
@@ -112,6 +122,7 @@ public class HBaseWriteOptions implements Serializable {
private long bufferFlushMaxSizeInBytes =
ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
private long bufferFlushMaxRows = 0;
private long bufferFlushIntervalMillis = 0;
+ private boolean ignoreNullValue;
private Integer parallelism;
/**
@@ -141,6 +152,14 @@ public class HBaseWriteOptions implements Serializable {
return this;
}
+ /**
+ * Optional. Sets whether ignore null value or not. By defaults, null
value will be writing.
+ */
+ public Builder setIgnoreNullValue(boolean ignoreNullValue) {
+ this.ignoreNullValue = ignoreNullValue;
+ return this;
+ }
+
/**
* Optional. Defines the parallelism of the HBase sink operator. By
default, the parallelism
* is determined by the framework using the same parallelism of the
upstream chained
@@ -157,6 +176,7 @@ public class HBaseWriteOptions implements Serializable {
bufferFlushMaxSizeInBytes,
bufferFlushMaxRows,
bufferFlushIntervalMillis,
+ ignoreNullValue,
parallelism);
}
}
diff --git
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
index 406a996..f07377c 100644
---
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
+++
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
@@ -34,16 +34,19 @@ public class RowDataToMutationConverter implements
HBaseMutationConverter<RowDat
private final HBaseTableSchema schema;
private final String nullStringLiteral;
+ private final boolean ignoreNullValue;
private transient HBaseSerde serde;
- public RowDataToMutationConverter(HBaseTableSchema schema, final String
nullStringLiteral) {
+ public RowDataToMutationConverter(
+ HBaseTableSchema schema, final String nullStringLiteral, boolean
ignoreNullValue) {
this.schema = schema;
this.nullStringLiteral = nullStringLiteral;
+ this.ignoreNullValue = ignoreNullValue;
}
@Override
public void open() {
- this.serde = new HBaseSerde(schema, nullStringLiteral);
+ this.serde = new HBaseSerde(schema, nullStringLiteral,
ignoreNullValue);
}
@Override
diff --git
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
index 0c8dc97..d760c03 100644
---
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
+++
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
@@ -88,6 +88,12 @@ public class HBaseConnectorOptions {
+ "Can be set to '0' to disable it. Note,
both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
+ "can be set to '0' with the flush
interval set allowing for complete async processing of buffered actions.");
+ public static final ConfigOption<Boolean> SINK_IGNORE_NULL_VALUE =
+ ConfigOptions.key("sink.ignore-null-value")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Writing option, whether ignore null
value or not.");
+
public static final ConfigOption<Boolean> LOOKUP_ASYNC =
ConfigOptions.key("lookup.async")
.booleanType()
diff --git
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
index 2141fe1..482644f 100644
---
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
+++
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
@@ -34,6 +34,7 @@ import java.util.Properties;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
+import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
@@ -89,6 +90,7 @@ public class HBaseConnectorOptionsUtil {
builder.setBufferFlushMaxRows(tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setBufferFlushMaxSizeInBytes(
tableOptions.get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes());
+ builder.setIgnoreNullValue(tableOptions.get(SINK_IGNORE_NULL_VALUE));
builder.setParallelism(tableOptions.getOptional(SINK_PARALLELISM).orElse(null));
return builder.build();
}
diff --git
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index d21cc4a..458b25d 100644
---
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -57,6 +57,8 @@ public class HBaseSerde {
private final byte[] nullStringBytes;
+ private final boolean writeIgnoreNullValue;
+
// row key index in output row
private final int rowkeyIndex;
@@ -77,6 +79,13 @@ public class HBaseSerde {
private final GenericRowData rowWithRowKey;
public HBaseSerde(HBaseTableSchema hbaseSchema, final String
nullStringLiteral) {
+ this(hbaseSchema, nullStringLiteral, false);
+ }
+
+ public HBaseSerde(
+ HBaseTableSchema hbaseSchema,
+ final String nullStringLiteral,
+ boolean writeIgnoreNullValue) {
this.families = hbaseSchema.getFamilyKeys();
this.rowkeyIndex = hbaseSchema.getRowKeyIndex();
LogicalType rowkeyType =
@@ -93,6 +102,7 @@ public class HBaseSerde {
this.keyDecoder = null;
}
this.nullStringBytes =
nullStringLiteral.getBytes(StandardCharsets.UTF_8);
+ this.writeIgnoreNullValue = writeIgnoreNullValue;
// prepare output rows
this.reusedRow = new GenericRowData(fieldLength);
@@ -141,6 +151,11 @@ public class HBaseSerde {
byte[] familyKey = families[f];
RowData familyRow = row.getRow(i, qualifiers[f].length);
for (int q = 0; q < this.qualifiers[f].length; q++) {
+ // ignore null value or not
+ if (writeIgnoreNullValue && familyRow.isNullAt(q)) {
+ continue;
+ }
+
// get quantifier key
byte[] qualifier = qualifiers[f][q];
// serialize value
diff --git
a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
index 3e9afb1..e370809 100644
---
a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
+++
b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
@@ -19,11 +19,14 @@
package org.apache.flink.connector.hbase.util;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.jupiter.api.Test;
@@ -58,7 +61,7 @@ class HBaseSerdeTest {
@Test
void convertToNewRowTest() {
- HBaseSerde serde = createHBaseSerde();
+ HBaseSerde serde = createHBaseSerde(false);
List<List<Cell>> cellsList = prepareCells();
List<RowData> resultRowDatas = new ArrayList<>();
List<String> resultRowDataStr = new ArrayList<>();
@@ -79,7 +82,7 @@ class HBaseSerdeTest {
@Test
void convertToReusedRowTest() {
- HBaseSerde serde = createHBaseSerde();
+ HBaseSerde serde = createHBaseSerde(false);
List<List<Cell>> cellsList = prepareCells();
List<RowData> resultRowDatas = new ArrayList<>();
List<String> resultRowDataStr = new ArrayList<>();
@@ -99,7 +102,32 @@ class HBaseSerdeTest {
"+I(2,+I(20),+I(Hello-2,200),+I(2.02,true,Welt-2))");
}
- private HBaseSerde createHBaseSerde() {
+ @Test
+ public void writeIgnoreNullValueTest() {
+ HBaseSerde serde = createHBaseSerde(false);
+ Put m1 = serde.createPutMutation(prepareRowData());
+ assert m1 != null;
+ assertThat(m1.getRow()).isNotEmpty();
+ assertThat(m1.get(FAMILY1.getBytes(), F1COL1.getBytes())).isNotEmpty();
+ assertThat(m1.get(FAMILY2.getBytes(), F2COL1.getBytes())).isNotEmpty();
+ assertThat(m1.get(FAMILY2.getBytes(), F2COL2.getBytes())).isNotEmpty();
+ assertThat(m1.get(FAMILY3.getBytes(), F3COL1.getBytes())).isNotEmpty();
+ assertThat(m1.get(FAMILY3.getBytes(), F3COL2.getBytes())).isNotEmpty();
+ assertThat(m1.get(FAMILY3.getBytes(), F3COL3.getBytes())).isNotEmpty();
+
+ HBaseSerde writeIgnoreNullValueSerde = createHBaseSerde(true);
+ Put m2 = writeIgnoreNullValueSerde.createPutMutation(prepareRowData());
+ assert m2 != null;
+ assertThat(m2.getRow()).isNotEmpty();
+ assertThat(m2.get(FAMILY1.getBytes(), F1COL1.getBytes())).isEmpty();
+ assertThat(m2.get(FAMILY2.getBytes(), F2COL1.getBytes())).isNotEmpty();
+ assertThat(m2.get(FAMILY2.getBytes(), F2COL2.getBytes())).isEmpty();
+ assertThat(m2.get(FAMILY3.getBytes(), F2COL1.getBytes())).isNotEmpty();
+ assertThat(m2.get(FAMILY3.getBytes(), F3COL2.getBytes())).isNotEmpty();
+ assertThat(m2.get(FAMILY3.getBytes(), F3COL3.getBytes())).isEmpty();
+ }
+
+ private HBaseTableSchema createHBaseTableSchema() {
DataType dataType =
ROW(
FIELD(ROW_KEY, INT()),
@@ -111,8 +139,11 @@ class HBaseSerdeTest {
FIELD(F3COL1, DOUBLE()),
FIELD(F3COL2, DataTypes.BOOLEAN()),
FIELD(F3COL3, STRING()))));
- HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType);
- return new HBaseSerde(hbaseSchema, "null");
+ return HBaseTableSchema.fromDataType(dataType);
+ }
+
+ private HBaseSerde createHBaseSerde(boolean writeIgnoreNullValue) {
+ return new HBaseSerde(createHBaseTableSchema(), "null",
writeIgnoreNullValue);
}
private List<List<Cell>> prepareCells() {
@@ -163,4 +194,25 @@ class HBaseSerdeTest {
cellList.add(cells2);
return cellList;
}
+
+ private RowData prepareRowData() {
+ GenericRowData fam1Row = new GenericRowData(1);
+ fam1Row.setField(0, null);
+
+ GenericRowData fam2Row = new GenericRowData(2);
+ fam2Row.setField(0, StringData.fromString("Hello-1"));
+ fam2Row.setField(1, null);
+
+ GenericRowData fam3Row = new GenericRowData(3);
+ fam3Row.setField(0, 2.02);
+ fam3Row.setField(1, true);
+ fam3Row.setField(2, null);
+
+ GenericRowData row = new GenericRowData(4);
+ row.setField(0, 10);
+ row.setField(1, fam1Row);
+ row.setField(2, fam2Row);
+ row.setField(3, fam3Row);
+ return row;
+ }
}