lirui-apache commented on a change in pull request #13315:
URL: https://github.com/apache/flink/pull/13315#discussion_r483604044
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
##########
@@ -328,4 +330,16 @@ public void setStaticPartition(Map<String, String>
partitionSpec) {
public void setOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
+
+ private void checkAcidTable() {
+ if (catalogTable != null && catalogTable.getOptions() != null) {
+ String tableIsTransactional =
catalogTable.getOptions().get("transactional");
+ if (tableIsTransactional == null) {
+ tableIsTransactional =
catalogTable.getOptions().get("transactional".toUpperCase());
+ }
+ if (tableIsTransactional != null &&
tableIsTransactional.equalsIgnoreCase("true")) {
+ throw new TableException(String.format("Cannot
write on the ACID table %s.", identifier.asSummaryString()));
Review comment:
```suggestion
throw new
FlinkHiveException(String.format("Writing ACID table %s is not supported",
identifier.asSummaryString()));
```
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
##########
@@ -330,6 +332,18 @@ public void setOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
+ private void checkAcidTable() {
+ if (catalogTable != null && catalogTable.getOptions() != null) {
+ String tableIsTransactional =
catalogTable.getOptions().get("transactional");
+ if (tableIsTransactional == null) {
+ tableIsTransactional =
catalogTable.getOptions().get("transactional".toUpperCase());
+ }
+ if (tableIsTransactional != null &&
tableIsTransactional.equalsIgnoreCase("true")) {
Review comment:
Let's extract these into a util method
##########
File path:
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
##########
@@ -574,6 +575,43 @@ public void testStreamCompressTextTable() throws Exception
{
testCompressTextTable(false);
}
+ private void testTransactionalTable(boolean batch) {
+ TableEnvironment tableEnv = batch ?
+ getTableEnvWithHiveCatalog() :
+ getStreamTableEnvWithHiveCatalog();
+ tableEnv.executeSql("create database db1");
+ try {
+ tableEnv.executeSql("create table db1.src (x string,y
string)");
+ hiveShell.execute("create table db1.dest (x string,y
string) clustered by (x) into 3 buckets stored as orc tblproperties
('transactional'='true')");
+ HiveTestUtils.createTextTableInserter(hiveShell, "db1",
"src")
+ .addRow(new Object[]{"a", "b"})
+ .addRow(new Object[]{"c", "d"})
+ .commit();
Review comment:
No need to insert data
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
##########
@@ -556,4 +558,16 @@ public String explainSource() {
public boolean isAsyncEnabled() {
return false;
}
+
+ private void checkAcidTable() {
+ if (catalogTable != null && catalogTable.getOptions() != null) {
+ String tableIsTransactional =
catalogTable.getOptions().get("transactional");
+ if (tableIsTransactional == null) {
+ tableIsTransactional =
catalogTable.getOptions().get("transactional".toUpperCase());
+ }
+ if (tableIsTransactional != null &&
tableIsTransactional.equalsIgnoreCase("true")) {
+ throw new TableException(String.format("Cannot
read on the ACID table %s.", tablePath));
Review comment:
```suggestion
throw new
FlinkHiveException(String.format("Reading ACID table %s is not supported",
tablePath));
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]