This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new cda702319f4 [fix](hive)fix hive insert only translaction
table.(#45753)(#46385) (#46454)
cda702319f4 is described below
commit cda702319f4b47582ce38d3b906aaf1e5675d05d
Author: daidai <[email protected]>
AuthorDate: Tue Jan 7 10:11:53 2025 +0800
[fix](hive)fix hive insert only translaction table.(#45753)(#46385) (#46454)
### What problem does this PR solve?
bp #45753 : fix read hive insert only Transaction table.
bp #46385 , #45999 : fix #45753 case unstable.
---
.../scripts/create_preinstalled_scripts/run25.hql | 53 ++++++++++++++
.../doris/datasource/hive/HMSExternalTable.java | 16 ++--
.../doris/datasource/hive/HiveMetaStoreCache.java | 49 +++++++------
.../doris/datasource/hive/HiveMetadataOps.java | 25 +++++++
.../org/apache/doris/datasource/hive/HiveUtil.java | 22 ++++++
.../doris/datasource/hive/source/HiveScanNode.java | 11 ++-
.../commands/insert/InsertIntoTableCommand.java | 3 +
.../hive/test_transactional_hive.out | 46 ++++++++++++
.../hive/test_hive_translation_insert_only.out | 20 +++++
.../hive/test_transactional_hive.groovy | 70 ++++++++++++++++++
.../hive/test_hive_translation_insert_only.groovy | 85 ++++++++++++++++++++++
11 files changed, 371 insertions(+), 29 deletions(-)
diff --git
a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql
b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql
index 814df4cdc5f..66e73f51df8 100755
---
a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql
+++
b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql
@@ -41,3 +41,56 @@ insert into orc_full_acid_par PARTITION(part_col=20230102)
values
(6, 'F');
update orc_full_acid_par set value = 'BB' where id = 2;
+
+
+
+
+create table orc_to_acid_tb (id INT, value STRING)
+PARTITIONED BY (part_col INT)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS ORC;
+INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=101) VALUES (1, 'A'), (3,
'C');
+INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=102) VALUES (2, 'B');
+ALTER TABLE orc_to_acid_tb SET TBLPROPERTIES ('transactional'='true');
+
+
+create table orc_to_acid_compacted_tb (id INT, value STRING)
+PARTITIONED BY (part_col INT)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS ORC;
+INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=101) VALUES (1,
'A'), (3, 'C');
+INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (2,
'B');
+ALTER TABLE orc_to_acid_compacted_tb SET TBLPROPERTIES
('transactional'='true');
+ALTER TABLE orc_to_acid_compacted_tb partition(part_col='101') COMPACT 'major'
and wait;
+ALTER TABLE orc_to_acid_compacted_tb partition(part_col='102') COMPACT 'major'
and wait;
+INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (4,
'D');
+update orc_to_acid_compacted_tb set value = "CC" where id = 3;
+update orc_to_acid_compacted_tb set value = "BB" where id = 2;
+
+
+create table orc_acid_minor (id INT, value STRING)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS ORC
+TBLPROPERTIES ('transactional' = 'true');
+insert into orc_acid_minor values (1, 'A');
+insert into orc_acid_minor values (2, 'B');
+insert into orc_acid_minor values (3, 'C');
+update orc_acid_minor set value = "BB" where id = 2;
+ALTER TABLE orc_acid_minor COMPACT 'minor' and wait;
+insert into orc_acid_minor values (4, 'D');
+update orc_acid_minor set value = "DD" where id = 4;
+DELETE FROM orc_acid_minor WHERE id = 3;
+
+
+create table orc_acid_major (id INT, value STRING)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS ORC
+TBLPROPERTIES ('transactional' = 'true');
+insert into orc_acid_major values (1, 'A');
+insert into orc_acid_major values (2, 'B');
+insert into orc_acid_major values (3, 'C');
+update orc_acid_major set value = "BB" where id = 2;
+ALTER TABLE orc_acid_major COMPACT 'minor' and wait;
+insert into orc_acid_major values (4, 'D');
+update orc_acid_major set value = "DD" where id = 4;
+DELETE FROM orc_acid_major WHERE id = 3;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 71c7308b079..b554f508103 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -30,6 +30,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
@@ -359,19 +360,24 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
public boolean isHiveTransactionalTable() {
- return dlaType == DLAType.HIVE &&
AcidUtils.isTransactionalTable(remoteTable)
- && isSupportedTransactionalFileFormat();
+ return dlaType == DLAType.HIVE &&
AcidUtils.isTransactionalTable(remoteTable);
}
- private boolean isSupportedTransactionalFileFormat() {
+ private boolean isSupportedFullAcidTransactionalFileFormat() {
// Sometimes we meet "transactional" = "true" but format is parquet,
which is not supported.
// So we need to check the input format for transactional table.
String inputFormatName = remoteTable.getSd().getInputFormat();
return inputFormatName != null &&
SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS.contains(inputFormatName);
}
- public boolean isFullAcidTable() {
- return dlaType == DLAType.HIVE &&
AcidUtils.isFullAcidTable(remoteTable);
+ public boolean isFullAcidTable() throws UserException {
+ if (dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable))
{
+ if (!isSupportedFullAcidTransactionalFileFormat()) {
+ throw new UserException("This table is full Acid Table, but no
Orc Format.");
+ }
+ return true;
+ }
+ return false;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 90af6b3f394..838005b47a9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -32,12 +32,16 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.common.util.CacheBulkLoader;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
+import org.apache.doris.datasource.hive.HiveUtil.ACIDFileFilter;
+import org.apache.doris.datasource.hive.HiveUtil.FullAcidFileFilter;
+import org.apache.doris.datasource.hive.HiveUtil.InsertOnlyACIDFileFilter;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.fs.remote.RemoteFile;
@@ -55,7 +59,6 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Iterables;
@@ -77,12 +80,10 @@ import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.URI;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -107,8 +108,6 @@ public class HiveMetaStoreCache {
// After hive 3, transactional table's will have file '_orc_acid_version'
with value >= '2'.
public static final String HIVE_ORC_ACID_VERSION_FILE =
"_orc_acid_version";
- private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX =
"bucket_";
-
private final HMSExternalCatalog catalog;
private JobConf jobConf;
private final ExecutorService refreshExecutor;
@@ -742,19 +741,16 @@ public class HiveMetaStoreCache {
public List<FileCacheValue> getFilesByTransaction(List<HivePartition>
partitions, ValidWriteIdList validWriteIds,
boolean isFullAcid, boolean skipCheckingAcidVersionFile, long
tableId, String bindBrokerName) {
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
- String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME);
try {
for (HivePartition partition : partitions) {
+
+ AuthenticationConfig authenticationConfig =
AuthenticationConfig.getKerberosConfig(jobConf);
+ HadoopAuthenticator hadoopAuthenticator =
+
HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
+
FileCacheValue fileCacheValue = new FileCacheValue();
- AcidUtils.Directory directory;
- if (!Strings.isNullOrEmpty(remoteUser)) {
- UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(remoteUser);
- directory =
ugi.doAs((PrivilegedExceptionAction<AcidUtils.Directory>) () ->
AcidUtils.getAcidState(
- new Path(partition.getPath()), jobConf,
validWriteIds, false, true));
- } else {
- directory = AcidUtils.getAcidState(new
Path(partition.getPath()), jobConf, validWriteIds, false,
- true);
- }
+ AcidUtils.Directory directory = hadoopAuthenticator.doAs(() ->
AcidUtils.getAcidState(
+ new Path(partition.getPath()), jobConf, validWriteIds,
false, true));
if (directory == null) {
return Collections.emptyList();
}
@@ -775,7 +771,8 @@ public class HiveMetaStoreCache {
return Collections.emptyList();
}
if (!skipCheckingAcidVersionFile) {
- String acidVersionPath = new Path(baseOrDeltaPath,
"_orc_acid_version").toUri().toString();
+ String acidVersionPath = new Path(
+ baseOrDeltaPath,
HIVE_ORC_ACID_VERSION_FILE).toUri().toString();
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
@@ -798,6 +795,8 @@ public class HiveMetaStoreCache {
}
}
+ ACIDFileFilter fileFilter = isFullAcid ? new
FullAcidFileFilter() : new InsertOnlyACIDFileFilter();
+
// delta directories
List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
for (AcidUtils.ParsedDelta delta :
directory.getCurrentDirectories()) {
@@ -810,14 +809,14 @@ public class HiveMetaStoreCache {
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
if (delta.isDeleteDelta()) {
- List<String> deleteDeltaFileNames =
remoteFiles.stream().map(f -> f.getName()).filter(
- name ->
name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
+ List<String> deleteDeltaFileNames =
remoteFiles.stream()
+ .map(f ->
f.getName()).filter(fileFilter::accept)
.collect(Collectors.toList());
deleteDeltas.add(new DeleteDeltaInfo(location,
deleteDeltaFileNames));
continue;
}
- remoteFiles.stream().filter(
- f ->
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> {
+ remoteFiles.stream().filter(f ->
fileFilter.accept(f.getName()))
+ .forEach(file -> {
LocationPath path = new
LocationPath(file.getPath().toString(),
catalog.getProperties());
fileCacheValue.addFile(file, path);
@@ -837,8 +836,7 @@ public class HiveMetaStoreCache {
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
- remoteFiles.stream().filter(
- f ->
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
+ remoteFiles.stream().filter(f ->
fileFilter.accept(f.getName()))
.forEach(file -> {
LocationPath path = new
LocationPath(file.getPath().toString(),
catalog.getProperties());
@@ -848,7 +846,12 @@ public class HiveMetaStoreCache {
throw new RuntimeException(status.getErrMsg());
}
}
- fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(),
deleteDeltas));
+
+ if (isFullAcid) {
+ fileCacheValue.setAcidInfo(new
AcidInfo(partition.getPath(), deleteDeltas));
+ } else if (!deleteDeltas.isEmpty()) {
+ throw new RuntimeException("No Hive Full Acid Table have
delete_delta_* Dir.");
+ }
fileCacheValues.add(fileCacheValue);
}
} catch (Exception e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index a660cb148ac..c611462f10a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -46,6 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -179,6 +180,25 @@ public class HiveMetadataOps implements
ExternalMetadataOps {
props.put("owner",
ConnectContext.get().getUserIdentity().getUser());
}
}
+
+ if (props.containsKey("transactional") &&
props.get("transactional").equals("true")) {
+ throw new UserException("Not support create hive transactional
table.");
+ /*
+ CREATE TABLE trans6(
+ `col1` int,
+ `col2` int
+ ) ENGINE=hive
+ PROPERTIES (
+ 'file_format'='orc',
+ 'compression'='zlib',
+ 'bucketing_version'='2',
+ 'transactional'='true',
+ 'transactional_properties'='default'
+ );
+ In hive, this table only can insert not update(not report
error,but not actually updated).
+ */
+ }
+
String fileFormat = props.getOrDefault(FILE_FORMAT_KEY,
Config.hive_default_file_format);
Map<String, String> ddlProps = new HashMap<>();
for (Map.Entry<String, String> entry : props.entrySet()) {
@@ -273,6 +293,11 @@ public class HiveMetadataOps implements
ExternalMetadataOps {
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE,
tblName, dbName);
}
}
+
+ if (AcidUtils.isTransactionalTable(client.getTable(dbName, tblName))) {
+ throw new DdlException("Not support drop hive transactional
table.");
+ }
+
try {
client.dropTable(dbName, tblName);
db.setUnInitialized(true);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
index ac7dcadbc26..c6acb1dea8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
@@ -70,6 +70,9 @@ public final class HiveUtil {
public static final Set<String> SUPPORTED_TEXT_COMPRESSIONS =
ImmutableSet.of("plain", "gzip", "zstd", "bzip2", "lz4", "snappy");
+ public static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX =
"bucket_";
+ public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
+
private HiveUtil() {
}
@@ -386,4 +389,23 @@ public final class HiveUtil {
return sd;
}
+
+ public interface ACIDFileFilter {
+ public boolean accept(String fileName);
+ }
+
+ public static final class FullAcidFileFilter implements ACIDFileFilter {
+ @Override
+ public boolean accept(String fileName) {
+ return fileName.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)
+ && !fileName.endsWith(DELTA_SIDE_FILE_SUFFIX);
+ }
+ }
+
+ public static final class InsertOnlyACIDFileFilter implements
ACIDFileFilter {
+ @Override
+ public boolean accept(String fileName) {
+ return true;
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 0c53f971ea4..1e09fa6d909 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -267,7 +267,16 @@ public class HiveScanNode extends FileQueryScanNode {
List<Split> allFiles, String bindBrokerName, int numBackends)
throws IOException, UserException {
List<FileCacheValue> fileCaches;
if (hiveTransaction != null) {
- fileCaches = getFileSplitByTransaction(cache, partitions,
bindBrokerName);
+ try {
+ fileCaches = getFileSplitByTransaction(cache, partitions,
bindBrokerName);
+ } catch (Exception e) {
+ // Release shared load (getValidWriteIds acquire Lock).
+ // If no exception is throw, the lock will be released when
`finalizeQuery()`.
+ // TODO: merge HMSTransaction,HiveTransaction,
HiveTransactionMgr,HiveTransactionManager
+ // and redesign the logic of this code.
+
Env.getCurrentHiveTransactionMgr().deregister(hiveTransaction.getQueryId());
+ throw e;
+ }
} else {
boolean withCache = Config.max_external_file_cache_num > 0;
fileCaches = cache.getFilesByPartitions(partitions, withCache,
partitions.size() > 1, bindBrokerName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 023b205ac53..9048f6c3a03 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -263,6 +263,9 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
} else if (physicalSink instanceof PhysicalHiveTableSink) {
boolean emptyInsert = childIsEmptyRelation(physicalSink);
HMSExternalTable hiveExternalTable = (HMSExternalTable)
targetTableIf;
+ if (hiveExternalTable.isHiveTransactionalTable()) {
+ throw new AnalysisException("Not supported insert into hive
transactional table.");
+ }
insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable,
label, planner,
Optional.of(insertCtx.orElse((new
HiveInsertCommandContext()))), emptyInsert);
// set hive query options
diff --git
a/regression-test/data/external_table_p0/hive/test_transactional_hive.out
b/regression-test/data/external_table_p0/hive/test_transactional_hive.out
index 2ad0446ccb7..060fa8c048e 100644
--- a/regression-test/data/external_table_p0/hive/test_transactional_hive.out
+++ b/regression-test/data/external_table_p0/hive/test_transactional_hive.out
@@ -76,3 +76,49 @@ F
-- !q05 --
0
+
+-- !2 --
+1 A 101
+2 BB 102
+3 CC 101
+4 D 102
+
+-- !3 --
+1 A 101
+3 CC 101
+
+-- !4 --
+2 BB 102
+4 D 102
+
+-- !5 --
+1 A 101
+2 BB 102
+
+-- !6 --
+4 D 102
+
+-- !7 --
+1 A
+2 BB
+4 DD
+
+-- !10 --
+1 A
+2 BB
+
+-- !11 --
+4 DD
+
+-- !12 --
+1 A
+2 BB
+4 DD
+
+-- !15 --
+1 A
+2 BB
+
+-- !16 --
+4 DD
+
diff --git
a/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out
b/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out
new file mode 100644
index 00000000000..04fccc9d4c0
--- /dev/null
+++
b/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out
@@ -0,0 +1,20 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !1 --
+1 A
+2 B
+3 C
+4 D
+
+-- !2 --
+1 A
+2 B
+3 C
+4 D
+5 E
+
+-- !3 --
+1 A
+2 B
+3 C
+4 D
+5 E
diff --git
a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy
b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy
index dbe20395ec9..81f2358e9da 100644
---
a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy
+++
b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy
@@ -52,6 +52,71 @@ suite("test_transactional_hive",
"p0,external,hive,external_docker,external_dock
select count(*) from orc_full_acid_par_empty;
"""
}
+
+
+ def test_acid = {
+ try {
+ sql """SET enable_fallback_to_original_planner=false;"""
+ sql """ select * from orc_to_acid_tb """
+ }catch( Exception e) {
+ logger.info("e.getMessage()" + e.getMessage())
+ assertTrue(e.getMessage().contains("Original non-ACID files in
transactional tables are not supported"));
+ }
+
+ qt_2 """ select * from orc_to_acid_compacted_tb order by id """
+ qt_3 """ select * from orc_to_acid_compacted_tb where part_col=101
order by id """
+ qt_4 """ select * from orc_to_acid_compacted_tb where part_col=102
order by id """
+ qt_5 """ select * from orc_to_acid_compacted_tb where id < 3 order by
id """
+ qt_6 """ select * from orc_to_acid_compacted_tb where id > 3 order by
id """
+
+
+ qt_7 """ select * from orc_acid_minor order by id """
+ qt_10 """ select * from orc_acid_minor where id < 3 order by id """
+ qt_11 """ select * from orc_acid_minor where id > 3 order by id """
+
+
+ qt_12 """ select * from orc_acid_major order by id """
+ qt_15 """ select * from orc_acid_major where id < 3 order by id """
+ qt_16 """ select * from orc_acid_major where id > 3 order by id """
+ }
+
+ def test_acid_write = {
+ sql """set enable_fallback_to_original_planner=false;"""
+
+
+
+ try {
+ sql """
+ CREATE TABLE acid_tb (
+ `col1` BOOLEAN COMMENT 'col1',
+ `col2` INT COMMENT 'col2'
+ ) ENGINE=hive
+ PROPERTIES (
+ 'file_format'='orc',
+ 'compression'='zlib',
+ 'bucketing_version'='2',
+ 'transactional'='true',
+ 'transactional_properties'='default'
+ );
+ """
+ }catch( Exception e) {
+ assertTrue(e.getMessage().contains("Not support create hive
transactional table."));
+ }
+ try {
+ sql """ insert into orc_acid_major(id,value) values(1,"a1"); """
+ }catch (Exception e) {
+ assertTrue(e.getMessage().contains("Not supported insert into hive
transactional table."));
+ }
+
+ try {
+ sql """ drop table orc_acid_major; """
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Not support drop hive
transactional table."));
+
+ }
+ }
+
+
String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("diable Hive test.")
@@ -60,6 +125,7 @@ suite("test_transactional_hive",
"p0,external,hive,external_docker,external_dock
for (String hivePrefix : ["hive3"]) {
try {
+ String hdfs_port = context.config.otherConfigs.get(hivePrefix +
"HdfsPort")
String hms_port = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
String catalog_name = "test_transactional_${hivePrefix}"
String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
@@ -68,6 +134,7 @@ suite("test_transactional_hive",
"p0,external,hive,external_docker,external_dock
sql """create catalog if not exists ${catalog_name} properties (
"type"="hms",
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
+ ,'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}'
);"""
sql """use `${catalog_name}`.`default`"""
@@ -79,6 +146,9 @@ suite("test_transactional_hive",
"p0,external,hive,external_docker,external_dock
q01()
q01_par()
+ test_acid()
+ test_acid_write()
+
sql """drop catalog if exists ${catalog_name}"""
} finally {
}
diff --git
a/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy
b/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy
new file mode 100644
index 00000000000..9b021e1dc81
--- /dev/null
+++
b/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hive_translation_insert_only",
"p2,external,hive,external_remote,external_remote_hive") {
+
+ String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+ //hudi hive use same catalog in p2.
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable test")
+ return;
+ }
+
+ String props = context.config.otherConfigs.get("hudiEmrCatalog")
+ String hms_catalog_name = "test_hive_translation_insert_only"
+
+ sql """drop catalog if exists ${hms_catalog_name};"""
+ sql """
+ CREATE CATALOG IF NOT EXISTS ${hms_catalog_name}
+ PROPERTIES (
+ ${props}
+ ,'hive.version' = '3.1.3'
+ );
+ """
+
+ logger.info("catalog " + hms_catalog_name + " created")
+ sql """switch ${hms_catalog_name};"""
+ logger.info("switched to catalog " + hms_catalog_name)
+ sql """ use regression;"""
+
+ qt_1 """ select * from text_insert_only order by id """
+ qt_2 """ select * from parquet_insert_only_major order by id """
+ qt_3 """ select * from orc_insert_only_minor order by id """
+
+ sql """drop catalog ${hms_catalog_name};"""
+}
+
+
+/*
+SET hive.support.concurrency=true;
+SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+create table text_insert_only (id INT, value STRING)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
+TBLPROPERTIES ('transactional' = 'true',
+'transactional_properties'='insert_only');
+insert into text_insert_only values (1, 'A');
+insert into text_insert_only values (2, 'B');
+insert into text_insert_only values (3, 'C');
+Load data xxx (4,D)
+create table parquet_insert_only_major (id INT, value STRING)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS parquet
+TBLPROPERTIES ('transactional' = 'true',
+'transactional_properties'='insert_only');
+insert into parquet_insert_only_major values (1, 'A');
+insert into parquet_insert_only_major values (2, 'B');
+insert into parquet_insert_only_major values (3, 'C');
+ALTER TABLE parquet_insert_only_major COMPACT 'major';
+insert into parquet_insert_only_major values (4, 'D');
+insert into parquet_insert_only_major values (5, 'E');
+create table orc_insert_only_minor (id INT, value STRING)
+CLUSTERED BY (id) INTO 3 BUCKETS
+stored as orc
+TBLPROPERTIES ('transactional' = 'true',
+'transactional_properties'='insert_only');
+insert into orc_insert_only_minor values (1, 'A');
+insert into orc_insert_only_minor values (2, 'B');
+insert into orc_insert_only_minor values (3, 'C');
+ALTER TABLE orc_insert_only_minor COMPACT 'minor';
+insert into orc_insert_only_minor values (4, 'D');
+insert into orc_insert_only_minor values (5, 'E');
+*/
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]