This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
The following commit(s) were added to refs/heads/main by this push:
new 0014377 [FLINK-33206] Verify the existence of HBase table before
read/write. This closes #22
0014377 is described below
commit 00143773ba3f647099b7f53c17133fef99ab8fed
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Fri Nov 10 16:21:55 2023 +0800
[FLINK-33206] Verify the existence of HBase table before read/write. This
closes #22
Co-authored-by: Tan-JiaLiang <[email protected]>
---
.../hbase1/source/HBaseRowDataInputFormat.java | 11 +++--
.../connector/hbase1/HBaseConnectorITCase.java | 48 ++++++++++++++++++++++
.../flink/connector/hbase1/util/HBaseTestBase.java | 1 +
.../hbase2/source/HBaseRowDataInputFormat.java | 18 ++++----
.../connector/hbase2/HBaseConnectorITCase.java | 47 +++++++++++++++++++++
.../flink/connector/hbase2/util/HBaseTestBase.java | 1 +
.../connector/hbase/sink/HBaseSinkFunction.java | 9 +++-
7 files changed, 117 insertions(+), 18 deletions(-)
diff --git
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseRowDataInputFormat.java
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseRowDataInputFormat.java
index d48c2fa..21ce8a4 100644
---
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseRowDataInputFormat.java
+++
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseRowDataInputFormat.java
@@ -86,12 +86,11 @@ public class HBaseRowDataInputFormat extends
AbstractTableInputFormat<RowData> {
}
private void connectToTable() throws IOException {
- try {
- connection =
ConnectionFactory.createConnection(getHadoopConfiguration());
- table = (HTable) connection.getTable(TableName.valueOf(tableName));
- } catch (TableNotFoundException tnfe) {
- LOG.error("The table " + tableName + " not found ", tnfe);
- throw new RuntimeException("HBase table '" + tableName + "' not
found.", tnfe);
+ connection =
ConnectionFactory.createConnection(getHadoopConfiguration());
+ TableName name = TableName.valueOf(tableName);
+ if (!connection.getAdmin().tableExists(name)) {
+ throw new TableNotFoundException("HBase table '" + tableName + "'
not found.");
}
+ table = (HTable) connection.getTable(name);
}
}
diff --git
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
index 0ea0002..399238d 100644
---
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
+++
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
@@ -21,6 +21,10 @@ package org.apache.flink.connector.hbase1;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.hbase.sink.HBaseSinkFunction;
+import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase1.source.AbstractTableInputFormat;
import org.apache.flink.connector.hbase1.source.HBaseRowDataInputFormat;
@@ -31,6 +35,7 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.TestBaseUtils;
@@ -38,7 +43,9 @@ import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CollectionUtil;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
@@ -52,6 +59,7 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.api.Expressions.$;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -493,6 +501,46 @@ public class HBaseConnectorITCase extends HBaseTestBase {
assertNull(inputFormat.getConnection());
}
+ @Test
+ public void testTableInputFormatTableExistence() throws IOException {
+ HBaseTableSchema tableSchema = new HBaseTableSchema();
+ tableSchema.addColumn(FAMILY1, F1COL1, byte[].class);
+ AbstractTableInputFormat<?> inputFormat =
+ new HBaseRowDataInputFormat(getConf(), TEST_NOT_EXISTS_TABLE,
tableSchema, "null");
+
+ assertThatThrownBy(() -> inputFormat.createInputSplits(1))
+ .isExactlyInstanceOf(TableNotFoundException.class);
+
+ inputFormat.close();
+ assertNull(inputFormat.getConnection());
+ }
+
+ @Test
+ public void testHBaseSinkFunctionTableExistence() throws Exception {
+ org.apache.hadoop.conf.Configuration hbaseConf =
+ HBaseConfigurationUtil.getHBaseConfiguration();
+ hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getZookeeperQuorum());
+ hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");
+
+ HBaseTableSchema tableSchema = new HBaseTableSchema();
+ tableSchema.addColumn(FAMILY1, F1COL1, byte[].class);
+
+ HBaseSinkFunction<RowData> sinkFunction =
+ new HBaseSinkFunction<>(
+ TEST_NOT_EXISTS_TABLE,
+ hbaseConf,
+ new RowDataToMutationConverter(tableSchema, "null",
false),
+ 2 * 1024 * 1024,
+ 1000,
+ 1000);
+
+ assertThatThrownBy(() -> sinkFunction.open(new Configuration()))
+ .getRootCause()
+ .isExactlyInstanceOf(TableNotFoundException.class);
+
+ sinkFunction.close();
+ }
+
//
-------------------------------------------------------------------------------------
// HBase lookup source tests
//
-------------------------------------------------------------------------------------
diff --git
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
index 86110c8..1576f80 100644
---
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
+++
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
@@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
protected static final String TEST_TABLE_2 = "testTable2";
protected static final String TEST_TABLE_3 = "testTable3";
protected static final String TEST_TABLE_4 = "testTable4";
+ protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
protected static final String ROW_KEY = "rowkey";
diff --git
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataInputFormat.java
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataInputFormat.java
index 1bd9a6a..f74eb85 100644
---
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataInputFormat.java
+++
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataInputFormat.java
@@ -85,16 +85,14 @@ public class HBaseRowDataInputFormat extends
AbstractTableInputFormat<RowData> {
}
private void connectToTable() throws IOException {
- try {
- if (connection == null) {
- connection =
ConnectionFactory.createConnection(getHadoopConfiguration());
- }
- TableName name = TableName.valueOf(getTableName());
- table = connection.getTable(name);
- regionLocator = connection.getRegionLocator(name);
- } catch (TableNotFoundException tnfe) {
- LOG.error("The table " + tableName + " not found ", tnfe);
- throw new RuntimeException("HBase table '" + tableName + "' not
found.", tnfe);
+ if (connection == null) {
+ connection =
ConnectionFactory.createConnection(getHadoopConfiguration());
}
+ TableName name = TableName.valueOf(tableName);
+ if (!connection.getAdmin().tableExists(name)) {
+ throw new TableNotFoundException("HBase table '" + tableName + "'
not found.");
+ }
+ table = connection.getTable(name);
+ regionLocator = connection.getRegionLocator(name);
}
}
diff --git
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index 64a9875..4ebc50f 100644
---
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.hbase.sink.HBaseSinkFunction;
+import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase2.source.AbstractTableInputFormat;
import org.apache.flink.connector.hbase2.source.HBaseRowDataInputFormat;
@@ -33,6 +36,7 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -41,7 +45,9 @@ import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CollectionUtil;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
@@ -59,6 +65,7 @@ import java.util.stream.StreamSupport;
import static org.apache.flink.table.api.Expressions.$;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -478,6 +485,46 @@ public class HBaseConnectorITCase extends HBaseTestBase {
assertNull(inputFormat.getConnection());
}
+ @Test
+ public void testTableInputFormatTableExistence() throws IOException {
+ HBaseTableSchema tableSchema = new HBaseTableSchema();
+ tableSchema.addColumn(FAMILY1, F1COL1, byte[].class);
+ AbstractTableInputFormat<?> inputFormat =
+ new HBaseRowDataInputFormat(getConf(), TEST_NOT_EXISTS_TABLE,
tableSchema, "null");
+
+ assertThatThrownBy(() -> inputFormat.createInputSplits(1))
+ .isExactlyInstanceOf(TableNotFoundException.class);
+
+ inputFormat.close();
+ assertNull(inputFormat.getConnection());
+ }
+
+ @Test
+ public void testHBaseSinkFunctionTableExistence() throws Exception {
+ org.apache.hadoop.conf.Configuration hbaseConf =
+ HBaseConfigurationUtil.getHBaseConfiguration();
+ hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getZookeeperQuorum());
+ hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");
+
+ HBaseTableSchema tableSchema = new HBaseTableSchema();
+ tableSchema.addColumn(FAMILY1, F1COL1, byte[].class);
+
+ HBaseSinkFunction<RowData> sinkFunction =
+ new HBaseSinkFunction<>(
+ TEST_NOT_EXISTS_TABLE,
+ hbaseConf,
+ new RowDataToMutationConverter(tableSchema, "null",
false),
+ 2 * 1024 * 1024,
+ 1000,
+ 1000);
+
+ assertThatThrownBy(() -> sinkFunction.open(new Configuration()))
+ .getRootCause()
+ .isExactlyInstanceOf(TableNotFoundException.class);
+
+ sinkFunction.close();
+ }
+
private void verifyHBaseLookupJoin(boolean async) {
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv,
streamSettings);
diff --git
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
index 1301ee1..621143d 100644
---
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
+++
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
@@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends
HBaseTestingClusterAutoStarter {
protected static final String TEST_TABLE_2 = "testTable2";
protected static final String TEST_TABLE_3 = "testTable3";
protected static final String TEST_TABLE_4 = "testTable4";
+ protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
protected static final String ROW_KEY = "rowkey";
diff --git
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
index 0c4de1a..0ffad05 100644
---
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
+++
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
@@ -120,9 +120,14 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
if (null == connection) {
this.connection = ConnectionFactory.createConnection(config);
}
+
+ TableName tableName = TableName.valueOf(hTableName);
+ if (!connection.getAdmin().tableExists(tableName)) {
+ throw new TableNotFoundException(tableName);
+ }
+
// create a parameter instance, set the table name and custom
listener reference.
- BufferedMutatorParams params =
- new
BufferedMutatorParams(TableName.valueOf(hTableName)).listener(this);
+ BufferedMutatorParams params = new
BufferedMutatorParams(tableName).listener(this);
if (bufferFlushMaxSizeInBytes > 0) {
params.writeBufferSize(bufferFlushMaxSizeInBytes);
}