[
https://issues.apache.org/jira/browse/IMPALA-11189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Qifan Chen updated IMPALA-11189:
--------------------------------
Description:
Stress test test_concurrent_inserts (in tests/stress/test_acid_stress.py) can
fail repeatedly in local catalog mode. In this case, the concurrent checker
query (select * from <table>) returns duplicated rows such as reported below,
where row [0,2] is duplicated.
The failure can be reproduced quite easily by running the test (i.e.,
TestConcurrentAcidInserts) first, via commenting out all the tests prior to it
in the test file tests/stress/test_acid_stress.py.
Setup:
1. Build the impala and clear HMS in case in a bad state:
$IMPALA_HOME/buildall.sh -format_metastore -notests
2. Start the cluster in local catalog mode:
$IMPALA_HOME/bin/start-impala-cluster.py --impalad_args
--use_local_catalog=true --catalogd_args --catalog_topic_mode=minimal
--catalogd_args --hms_event_polling_interval_s=1
3. Run the modified stress test: $IMPALA_HOME/bin/impala-py.test
$IMPALA_TESTS/stress/test_acid_stress.py
Error reported:
{code:java}
09:11:00 qchen@qifan-10229: Impala.03112022] test_acid_stress
rootLoggerLevel = INFO
================================================== test session starts
===================================================
platform linux2 -- Python 2.7.16, pytest-2.9.2, py-1.4.32, pluggy-0.3.1 --
/home/qchen/Impala.03112022/infra/python/env-gcc7.5.0/bin/python
cachedir: tests/.cache
rootdir: /home/qchen/Impala.03112022/tests, inifile: pytest.ini
plugins: xdist-1.17.1, timeout-1.2.1, random-0.2, forked-0.2
timeout: 7200s method: signal
collected 2 items
tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::test_concurrent_inserts[unique_database0]
FAILED
tests/stress/test_acid_stress.py::TestFailingAcidInserts::test_failing_inserts[unique_database0]
PASSED
================================================ short test summary info
=================================================
FAIL
tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]
======================================================== FAILURES
========================================================
__________________________
TestConcurrentAcidInserts.test_concurrent_inserts[unique_database0]
___________________________
tests/stress/test_acid_stress.py:307: in test_concurrent_inserts
run_tasks(writers + checkers)
tests/stress/stress_util.py:45: in run_tasks
pool.map_async(Task.run, tasks).get(timeout_seconds)
../Impala.03082022/toolchain/toolchain-packages-gcc7.5.0/python-2.7.16/lib/python2.7/multiprocessing/pool.py:572:
in get
raise self._value
E AssertionError: wid: 2
E assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
E At index 3 diff: 2 != 3
E Left contains more items, first extra item: 4
E Full diff:
E - [0, 1, 2, 2, 3, 4]
E ? ---
E + [0, 1, 2, 3, 4]
------------------------------------------------- Captured stderr setup
--------------------------------------------------
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21000
-- connecting to localhost:21050 with impyla
-- 2022-03-16 09:20:54,762 INFO MainThread: Closing active operation
-- connecting to localhost:28000 with impyla
-- 2022-03-16 09:20:54,774 INFO MainThread: Closing active operation
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET sync_ddl=True;
-- executing against localhost:21000
DROP DATABASE IF EXISTS `test_concurrent_inserts_8933345c` CASCADE;
-- 2022-03-16 09:20:54,808 INFO MainThread: Started query
28457f4c7e77cdec:c6d3731900000000
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET sync_ddl=True;
-- executing against localhost:21000
CREATE DATABASE `test_concurrent_inserts_8933345c`;
-- 2022-03-16 09:20:54,877 INFO MainThread: Started query
374bf99aea680523:48d2405400000000
-- 2022-03-16 09:21:01,164 INFO MainThread: Created database
"test_concurrent_inserts_8933345c" for test ID
"stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]"
-------------------------------------------------- Captured stderr call
--------------------------------------------------
SET SYNC_DDL=true;
-- executing against localhost:21000
drop table if exists test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- 2022-03-16 09:21:01,173 INFO MainThread: Started query
20480c2a1d336d35:c2d84edd00000000
-- executing against localhost:21000
create table test_concurrent_inserts_8933345c.test_concurrent_inserts (wid int,
i int) TBLPROPERTIES (
'transactional_properties' = 'insert_only', 'transactional' = 'true')
;
-- 2022-03-16 09:21:01,294 INFO MainThread: Started query
754969473483b4e9:acfc852300000000
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21000
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21001
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- executing against localhost:21000
-- connecting to: localhost:21000
-- connecting to: localhost:21002
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21001
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21002
-- connecting to: localhost:21000
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (0, 0);
-- executing against localhost:21001
-- connecting to: localhost:21002
-- executing against localhost:21002
-- executing against localhost:21000
-- connecting to: localhost:21001
-- executing against localhost:21001
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (1, 0);
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 0);
-- executing against localhost:21000
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (3, 0);
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (4, 0);
-- executing against localhost:21001
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (5, 0);
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- 2022-03-16 09:21:07,824 INFO Thread-3: Started query
6f4e9d38d1723252:a6a7368f00000000
-- 2022-03-16 09:21:07,859 INFO Thread-4: Started query
33423e0145b7d3e0:ea8d626200000000
-- 2022-03-16 09:21:07,861 INFO Thread-8: Started query
4046d8edde82931b:aa11c84800000000
-- 2022-03-16 09:21:07,875 INFO Thread-9: Started query
594d63b7814c31ab:8f2b92ca00000000
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 1);
-- 2022-03-16 09:21:08,229 INFO Thread-4: Started query
a94dab601c125bb4:8988934300000000
-- executing against localhost:21000
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 2);
-- 2022-03-16 09:21:08,384 INFO Thread-3: Started query
4040597eba7beb36:4cfa08c800000000
-- 2022-03-16 09:21:08,409 INFO Thread-4: Started query
854e1aee63575861:dbf8bafb00000000
-- executing against localhost:21002
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- executing against localhost:21001
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- 2022-03-16 09:21:08,435 INFO Thread-9: Started query
7e4d4cf54a44cf03:868efcb100000000
-- 2022-03-16 09:21:08,456 INFO Thread-8: Started query
4645788cf2a4d401:0aea969e00000000
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 3);
-- 2022-03-16 09:21:08,586 INFO Thread-4: Started query
4e4cdd976dfa3358:6c455b7300000000
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 4);
-- 2022-03-16 09:21:08,711 INFO Thread-4: Started query
6f46942eb8807e5f:ffbb146000000000
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 5);
-- executing against localhost:21000
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- 2022-03-16 09:21:08,959 INFO Thread-3: Started query
d940fa405a776bef:3705218900000000
-- 2022-03-16 09:21:08,977 INFO Thread-4: Started query
f947061a7c45901c:12b2ba9d00000000
-- executing against localhost:21001
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- 2022-03-16 09:21:08,997 INFO Thread-9: Started query
f54a0aa01bbb2f86:3b89ae9600000000
-- executing against localhost:21002
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- 2022-03-16 09:21:09,147 INFO Thread-8: Started query
fa418523bc0552ce:f0e49b1a00000000
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 6);
-- 2022-03-16 09:21:09,250 INFO Thread-4: Started query
fc4751d49d1a1aa2:4bcb48d600000000
-- closing connection to: localhost:21002
Traceback (most recent call last):
File "/home/qchen/Impala.03112022/tests/stress/stress_util.py", line 35, in
run
return self.func(*self.args, **self.kwargs)
File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line
276, in _impala_role_concurrent_checker
verify_result_set(result)
File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line
269, in verify_result_set
assert sorted_run == range(sorted_run[0], sorted_run[-1] + 1), "wid: %d" %
wid
AssertionError: wid: 2
assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
At index 3 diff: 2 != 3
Left contains more items, first extra item: 4
Full diff:
- [0, 1, 2, 2, 3, 4]
? ---
+ [0, 1, 2, 3, 4]
========================================== 1 failed, 1 passed in 158.88 seconds
==========================================
[09:23:33 qchen@qifan-10229: Impala.03112022] git branch
IMPALA-10992-auto-scaling-planner-support
* master
[09:23:41 qchen@qifan-10229: Impala.03112022] git diff
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index a921bb961..6df592f3f 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1644,6 +1644,7 @@ public class Frontend {
markTimelineRetries(attempt, retryMsg, timeline);
return req;
} catch (InconsistentMetadataFetchException e) {
+ LOG.error("BAR: catch an InconsistentMetadataFetchException");
if (attempt++ == INCONSISTENT_METADATA_NUM_RETRIES) {
markTimelineRetries(attempt, e.getMessage(), timeline);
throw e;
@@ -1819,14 +1820,18 @@ public class Frontend {
} catch (Exception e) {
if (queryCtx.isSetTransaction_id()) {
try {
+ LOG.error("BAR: to abort Transaction");
abortTransaction(queryCtx.getTransaction_id());
+ LOG.error("BAR: Transaction aborted");
timeline.markEvent("Transaction aborted");
} catch (TransactionException te) {
LOG.error("Could not abort transaction because: " + te.getMessage());
}
} else if (queryCtx.isIs_kudu_transactional()) {
try {
+ LOG.error("BAR: to abort kudu Transaction");
abortKuduTransaction(queryCtx.getQuery_id());
+ LOG.error("BAR: kudu Transaction aborted");
timeline.markEvent(
"Kudu transaction aborted: " +
queryCtx.getQuery_id().toString());
} catch (TransactionException te) {
diff --git a/tests/stress/test_acid_stress.py b/tests/stress/test_acid_stress.py
index f6439ff3c..09ec874e7 100644
--- a/tests/stress/test_acid_stress.py
+++ b/tests/stress/test_acid_stress.py
@@ -46,188 +46,188 @@ class TestAcidStress(ImpalaTestSuite):
v.get_value('table_format').compression_codec == 'none'))
-class TestAcidInsertsBasic(TestAcidStress):
- @classmethod
- def get_workload(self):
- return super(TestAcidInsertsBasic, self).get_workload()
-
- @classmethod
- def add_test_dimensions(cls):
- super(TestAcidInsertsBasic, cls).add_test_dimensions()
-
- def _verify_result(self, result, expected_result):
- """Verify invariants for 'run' and 'i'."""
- assert len(result.data) > 0
- run_max = -1
- i_list = []
- for line in result.data:
- [run, i] = map(int, (line.split('\t')))
- run_max = max(run_max, run)
- i_list.append(i)
- assert expected_result["run"] <= run_max # shouldn't see data overwritten
in the past
- i_list.sort()
- if expected_result["run"] < run_max:
- expected_result["run"] = run_max
- expected_result["i"] = 0
- return
- assert i_list[-1] >= expected_result["i"]
- assert i_list == range(i_list[-1] + 1) # 'i' should have all values from
0 to max_i
- expected_result["i"] = i_list[-1]
-
- def _hive_role_write_inserts(self, tbl_name, partitioned):
- """INSERT INTO/OVERWRITE a table several times from Hive."""
- part_expr = "partition (p=1)" if partitioned else ""
- for run in xrange(0, NUM_OVERWRITES):
- OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
- """ % (tbl_name, part_expr, run, 0)
- self.run_stmt_in_hive(OVERWRITE_SQL)
- for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
- INSERT_SQL = """insert into table %s %s values (%i, %i)
- """ % (tbl_name, part_expr, run, i)
- self.run_stmt_in_hive(INSERT_SQL)
-
- def _impala_role_write_inserts(self, tbl_name, partitioned):
- """INSERT INTO/OVERWRITE a table several times from Impala."""
- try:
- impalad_client = ImpalaTestSuite.create_impala_client()
- part_expr = "partition (p=1)" if partitioned else ""
- for run in xrange(0, NUM_OVERWRITES + 1):
- OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
- """ % (tbl_name, part_expr, run, 0)
- impalad_client.execute(OVERWRITE_SQL)
- for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
- INSERT_SQL = """insert into table %s %s values (%i, %i)
- """ % (tbl_name, part_expr, run, i)
- impalad_client.execute(INSERT_SQL)
- finally:
- impalad_client.close()
-
- def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds):
- """SELECT from a table many times until the expected final values are
found."""
- try:
- impalad_client = ImpalaTestSuite.create_impala_client()
- expected_result = {"run": -1, "i": 0}
- accept_empty_table = True
- while expected_result["run"] != NUM_OVERWRITES and \
- expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
- time.sleep(sleep_seconds)
- if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
- result = impalad_client.execute("select run, i from %s" % tbl_name)
- if len(result.data) == 0:
- assert accept_empty_table
- continue
- accept_empty_table = False
- self._verify_result(result, expected_result)
- finally:
- impalad_client.close()
-
- def _create_table(self, full_tbl_name, partitioned):
- """Creates test table with name 'full_tbl_name'. Table is partitioned if
- 'partitioned' is set to True."""
- part_expr = "partitioned by (p int)" if partitioned else ""
-
- CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
- 'transactional_properties' = 'insert_only', 'transactional' = 'true')
- """ % (full_tbl_name, part_expr)
- self.client.execute("drop table if exists %s" % full_tbl_name)
- self.client.execute(CREATE_SQL)
-
- def _run_test_read_hive_inserts(self, unique_database, partitioned):
- """Check that Impala can read a single insert only ACID table
(over)written by Hive
- several times. Consistency can be checked by using incremental values for
- overwrites ('run') and inserts ('i').
- """
- tbl_name = "%s.test_read_hive_inserts" % unique_database
- self._create_table(tbl_name, partitioned)
-
- run_tasks([
- Task(self._hive_role_write_inserts, tbl_name, partitioned),
- Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
- sleep_seconds=3)])
-
- def _run_test_read_impala_inserts(self, unique_database, partitioned):
- """Check that Impala can read a single insert only ACID table
(over)written by Hive
- several times. Consistency can be checked by using incremental values for
- overwrites ('run') and inserts ('i').
- """
- tbl_name = "%s.test_read_impala_inserts" % unique_database
- self._create_table(tbl_name, partitioned)
-
- run_tasks([
- Task(self._impala_role_write_inserts, tbl_name, partitioned),
- Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
- sleep_seconds=0.1)])
-
- @SkipIfHive2.acid
- @SkipIfS3.hive
- @SkipIfGCS.hive
- @SkipIfCOS.hive
- @pytest.mark.execute_serially
- @pytest.mark.stress
- def test_read_hive_inserts(self, unique_database):
- """Check that Impala can read partitioned and non-partitioned ACID tables
- written by Hive."""
- for is_partitioned in [False, True]:
- self._run_test_read_hive_inserts(unique_database, is_partitioned)
-
- @SkipIfHive2.acid
- @pytest.mark.execute_serially
- @pytest.mark.stress
- def test_read_impala_inserts(self, unique_database):
- """Check that Impala can read partitioned and non-partitioned ACID tables
- written by Hive."""
- for is_partitioned in [False, True]:
- self._run_test_read_impala_inserts(unique_database, is_partitioned)
-
- def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite,
sleep_sec):
- insert_op = "OVERWRITE" if is_overwrite else "INTO"
- try:
- impalad_client = ImpalaTestSuite.create_impala_client()
- impalad_client.execute(
- """insert {op} table {tbl_name} partition({partition})
- select sleep({sleep_ms})""".format(op=insert_op,
tbl_name=tbl_name,
- partition=partition, sleep_ms=sleep_sec * 1000))
- finally:
- impalad_client.close()
-
- @pytest.mark.execute_serially
- @pytest.mark.stress
- @SkipIf.not_hdfs
- @UniqueDatabase.parametrize(sync_ddl=True)
- def test_partitioned_inserts(self, unique_database):
- """Check that the different ACID write operations take appropriate locks.
- INSERT INTO: should take a shared lock
- INSERT OVERWRITE: should take an exclusive lock
- Both should take PARTITION-level lock in case of static partition
insert."""
- tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
- self.client.set_configuration_option("SYNC_DDL", "true")
- self.client.execute("""
- CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
- TBLPROPERTIES(
-
'transactional_properties'='insert_only','transactional'='true')""".format(
- tbl_name))
- # Warmup INSERT
- self.execute_query("alter table {0} add
partition(p=0,q=0)".format(tbl_name))
- sleep_sec = 5
- task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
- "p=1,q=1", False, sleep_sec)
- # INSERT INTO the same partition can run in parallel.
- duration = run_tasks([task_insert_into, task_insert_into])
- assert duration < 3 * sleep_sec
- task_insert_overwrite = Task(self._impala_role_partition_writer, tbl_name,
- "p=1,q=1", True, sleep_sec)
- # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
- duration = run_tasks([task_insert_into, task_insert_overwrite])
- assert duration > 4 * sleep_sec
- # INSERT OVERWRITEs to the same partition should have mutual exclusion.
- duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
- assert duration > 4 * sleep_sec
- task_insert_overwrite_2 = Task(self._impala_role_partition_writer,
tbl_name,
- "p=1,q=2", True, sleep_sec)
- # INSERT OVERWRITEs to different partitions can run in parallel.
- duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
- assert duration < 3 * sleep_sec
-
+#class TestAcidInsertsBasic(TestAcidStress):
+# @classmethod
+# def get_workload(self):
+# return super(TestAcidInsertsBasic, self).get_workload()
+#
+# @classmethod
+# def add_test_dimensions(cls):
+# super(TestAcidInsertsBasic, cls).add_test_dimensions()
+#
+# def _verify_result(self, result, expected_result):
+# """Verify invariants for 'run' and 'i'."""
+# assert len(result.data) > 0
+# run_max = -1
+# i_list = []
+# for line in result.data:
+# [run, i] = map(int, (line.split('\t')))
+# run_max = max(run_max, run)
+# i_list.append(i)
+# assert expected_result["run"] <= run_max # shouldn't see data
overwritten in the past
+# i_list.sort()
+# if expected_result["run"] < run_max:
+# expected_result["run"] = run_max
+# expected_result["i"] = 0
+# return
+# assert i_list[-1] >= expected_result["i"]
+# assert i_list == range(i_list[-1] + 1) # 'i' should have all values from
0 to max_i
+# expected_result["i"] = i_list[-1]
+#
+# def _hive_role_write_inserts(self, tbl_name, partitioned):
+# """INSERT INTO/OVERWRITE a table several times from Hive."""
+# part_expr = "partition (p=1)" if partitioned else ""
+# for run in xrange(0, NUM_OVERWRITES):
+# OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
+# """ % (tbl_name, part_expr, run, 0)
+# self.run_stmt_in_hive(OVERWRITE_SQL)
+# for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
+# INSERT_SQL = """insert into table %s %s values (%i, %i)
+# """ % (tbl_name, part_expr, run, i)
+# self.run_stmt_in_hive(INSERT_SQL)
+#
+# def _impala_role_write_inserts(self, tbl_name, partitioned):
+# """INSERT INTO/OVERWRITE a table several times from Impala."""
+# try:
+# impalad_client = ImpalaTestSuite.create_impala_client()
+# part_expr = "partition (p=1)" if partitioned else ""
+# for run in xrange(0, NUM_OVERWRITES + 1):
+# OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
+# """ % (tbl_name, part_expr, run, 0)
+# impalad_client.execute(OVERWRITE_SQL)
+# for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
+# INSERT_SQL = """insert into table %s %s values (%i, %i)
+# """ % (tbl_name, part_expr, run, i)
+# impalad_client.execute(INSERT_SQL)
+# finally:
+# impalad_client.close()
+#
+# def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds):
+# """SELECT from a table many times until the expected final values are
found."""
+# try:
+# impalad_client = ImpalaTestSuite.create_impala_client()
+# expected_result = {"run": -1, "i": 0}
+# accept_empty_table = True
+# while expected_result["run"] != NUM_OVERWRITES and \
+# expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
+# time.sleep(sleep_seconds)
+# if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
+# result = impalad_client.execute("select run, i from %s" % tbl_name)
+# if len(result.data) == 0:
+# assert accept_empty_table
+# continue
+# accept_empty_table = False
+# self._verify_result(result, expected_result)
+# finally:
+# impalad_client.close()
+#
+# def _create_table(self, full_tbl_name, partitioned):
+# """Creates test table with name 'full_tbl_name'. Table is partitioned if
+# 'partitioned' is set to True."""
+# part_expr = "partitioned by (p int)" if partitioned else ""
+#
+# CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
+# 'transactional_properties' = 'insert_only', 'transactional' = 'true')
+# """ % (full_tbl_name, part_expr)
+# self.client.execute("drop table if exists %s" % full_tbl_name)
+# self.client.execute(CREATE_SQL)
+#
+# def _run_test_read_hive_inserts(self, unique_database, partitioned):
+# """Check that Impala can read a single insert only ACID table
(over)written by Hive
+# several times. Consistency can be checked by using incremental values for
+# overwrites ('run') and inserts ('i').
+# """
+# tbl_name = "%s.test_read_hive_inserts" % unique_database
+# self._create_table(tbl_name, partitioned)
+#
+# run_tasks([
+# Task(self._hive_role_write_inserts, tbl_name, partitioned),
+# Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
+# sleep_seconds=3)])
+#
+# def _run_test_read_impala_inserts(self, unique_database, partitioned):
+# """Check that Impala can read a single insert only ACID table
(over)written by Hive
+# several times. Consistency can be checked by using incremental values for
+# overwrites ('run') and inserts ('i').
+# """
+# tbl_name = "%s.test_read_impala_inserts" % unique_database
+# self._create_table(tbl_name, partitioned)
+#
+# run_tasks([
+# Task(self._impala_role_write_inserts, tbl_name, partitioned),
+# Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
+# sleep_seconds=0.1)])
+#
+# @SkipIfHive2.acid
+# @SkipIfS3.hive
+# @SkipIfGCS.hive
+# @SkipIfCOS.hive
+# @pytest.mark.execute_serially
+# @pytest.mark.stress
+# def test_read_hive_inserts(self, unique_database):
+# """Check that Impala can read partitioned and non-partitioned ACID tables
+# written by Hive."""
+# for is_partitioned in [False, True]:
+# self._run_test_read_hive_inserts(unique_database, is_partitioned)
+#
+# @SkipIfHive2.acid
+# @pytest.mark.execute_serially
+# @pytest.mark.stress
+# def test_read_impala_inserts(self, unique_database):
+# """Check that Impala can read partitioned and non-partitioned ACID tables
+# written by Hive."""
+# for is_partitioned in [False, True]:
+# self._run_test_read_impala_inserts(unique_database, is_partitioned)
+#
+# def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite,
sleep_sec):
+# insert_op = "OVERWRITE" if is_overwrite else "INTO"
+# try:
+# impalad_client = ImpalaTestSuite.create_impala_client()
+# impalad_client.execute(
+# """insert {op} table {tbl_name} partition({partition})
+# select sleep({sleep_ms})""".format(op=insert_op,
tbl_name=tbl_name,
+# partition=partition, sleep_ms=sleep_sec * 1000))
+# finally:
+# impalad_client.close()
+#
+# @pytest.mark.execute_serially
+# @pytest.mark.stress
+# @SkipIf.not_hdfs
+# @UniqueDatabase.parametrize(sync_ddl=True)
+# def test_partitioned_inserts(self, unique_database):
+# """Check that the different ACID write operations take appropriate locks.
+# INSERT INTO: should take a shared lock
+# INSERT OVERWRITE: should take an exclusive lock
+# Both should take PARTITION-level lock in case of static partition
insert."""
+# tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
+# self.client.set_configuration_option("SYNC_DDL", "true")
+# self.client.execute("""
+# CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
+# TBLPROPERTIES(
+#
'transactional_properties'='insert_only','transactional'='true')""".format(
+# tbl_name))
+# # Warmup INSERT
+# self.execute_query("alter table {0} add
partition(p=0,q=0)".format(tbl_name))
+# sleep_sec = 5
+# task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
+# "p=1,q=1", False, sleep_sec)
+# # INSERT INTO the same partition can run in parallel.
+# duration = run_tasks([task_insert_into, task_insert_into])
+# assert duration < 3 * sleep_sec
+# task_insert_overwrite = Task(self._impala_role_partition_writer, tbl_name,
+# "p=1,q=1", True, sleep_sec)
+# # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
+# duration = run_tasks([task_insert_into, task_insert_overwrite])
+# assert duration > 4 * sleep_sec
+# # INSERT OVERWRITEs to the same partition should have mutual exclusion.
+# duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
+# assert duration > 4 * sleep_sec
+# task_insert_overwrite_2 = Task(self._impala_role_partition_writer,
tbl_name,
+# "p=1,q=2", True, sleep_sec)
+# # INSERT OVERWRITEs to different partitions can run in parallel.
+# duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
+# assert duration < 3 * sleep_sec
+#
class TestConcurrentAcidInserts(TestAcidStress):
@classmethod
(END)
{code}
was:
Stress test test_concurrent_inserts (in tests/stress/test_acid_stress.py) fail
repeatedly in local catalog mode. The concurrent checker query (select * from
<table>) can return duplicated rows such as reported below, where row [0,2] is
duplicated.
This can be reproduced quite easily by running the test (i.e.,
TestConcurrentAcidInserts) first, via commenting out all the tests prior to it
in the test file tests/stress/test_acid_stress.py.
Setup:
1. Build the impala and clear HMS in case in a bad state:
$IMPALA_HOME/buildall.sh -format_metastore -notests
2. Start the cluster in local catalog mode:
$IMPALA_HOME/bin/start-impala-cluster.py --impalad_args
--use_local_catalog=true --catalogd_args --catalog_topic_mode=minimal
--catalogd_args --hms_event_polling_interval_s=1
3. Run the modified stress test: $IMPALA_HOME/bin/impala-py.test
$IMPALA_TESTS/stress/test_acid_stress.py
Error reported:
{code:java}
09:11:00 qchen@qifan-10229: Impala.03112022] test_acid_stress
rootLoggerLevel = INFO
================================================== test session starts
===================================================
platform linux2 -- Python 2.7.16, pytest-2.9.2, py-1.4.32, pluggy-0.3.1 --
/home/qchen/Impala.03112022/infra/python/env-gcc7.5.0/bin/python
cachedir: tests/.cache
rootdir: /home/qchen/Impala.03112022/tests, inifile: pytest.ini
plugins: xdist-1.17.1, timeout-1.2.1, random-0.2, forked-0.2
timeout: 7200s method: signal
collected 2 items
tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::test_concurrent_inserts[unique_database0]
FAILED
tests/stress/test_acid_stress.py::TestFailingAcidInserts::test_failing_inserts[unique_database0]
PASSED
================================================ short test summary info
=================================================
FAIL
tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]
======================================================== FAILURES
========================================================
__________________________
TestConcurrentAcidInserts.test_concurrent_inserts[unique_database0]
___________________________
tests/stress/test_acid_stress.py:307: in test_concurrent_inserts
run_tasks(writers + checkers)
tests/stress/stress_util.py:45: in run_tasks
pool.map_async(Task.run, tasks).get(timeout_seconds)
../Impala.03082022/toolchain/toolchain-packages-gcc7.5.0/python-2.7.16/lib/python2.7/multiprocessing/pool.py:572:
in get
raise self._value
E AssertionError: wid: 2
E assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
E At index 3 diff: 2 != 3
E Left contains more items, first extra item: 4
E Full diff:
E - [0, 1, 2, 2, 3, 4]
E ? ---
E + [0, 1, 2, 3, 4]
------------------------------------------------- Captured stderr setup
--------------------------------------------------
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21000
-- connecting to localhost:21050 with impyla
-- 2022-03-16 09:20:54,762 INFO MainThread: Closing active operation
-- connecting to localhost:28000 with impyla
-- 2022-03-16 09:20:54,774 INFO MainThread: Closing active operation
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET sync_ddl=True;
-- executing against localhost:21000
DROP DATABASE IF EXISTS `test_concurrent_inserts_8933345c` CASCADE;
-- 2022-03-16 09:20:54,808 INFO MainThread: Started query
28457f4c7e77cdec:c6d3731900000000
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET sync_ddl=True;
-- executing against localhost:21000
CREATE DATABASE `test_concurrent_inserts_8933345c`;
-- 2022-03-16 09:20:54,877 INFO MainThread: Started query
374bf99aea680523:48d2405400000000
-- 2022-03-16 09:21:01,164 INFO MainThread: Created database
"test_concurrent_inserts_8933345c" for test ID
"stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]"
-------------------------------------------------- Captured stderr call
--------------------------------------------------
SET SYNC_DDL=true;
-- executing against localhost:21000
drop table if exists test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- 2022-03-16 09:21:01,173 INFO MainThread: Started query
20480c2a1d336d35:c2d84edd00000000
-- executing against localhost:21000
create table test_concurrent_inserts_8933345c.test_concurrent_inserts (wid int,
i int) TBLPROPERTIES (
'transactional_properties' = 'insert_only', 'transactional' = 'true')
;
-- 2022-03-16 09:21:01,294 INFO MainThread: Started query
754969473483b4e9:acfc852300000000
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21000
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21001
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- executing against localhost:21000
-- connecting to: localhost:21000
-- connecting to: localhost:21002
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21001
SET
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21002
-- connecting to: localhost:21000
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (0, 0);
-- executing against localhost:21001
-- connecting to: localhost:21002
-- executing against localhost:21002
-- executing against localhost:21000
-- connecting to: localhost:21001
-- executing against localhost:21001
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (1, 0);
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 0);
-- executing against localhost:21000
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (3, 0);
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (4, 0);
-- executing against localhost:21001
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (5, 0);
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- 2022-03-16 09:21:07,824 INFO Thread-3: Started query
6f4e9d38d1723252:a6a7368f00000000
-- 2022-03-16 09:21:07,859 INFO Thread-4: Started query
33423e0145b7d3e0:ea8d626200000000
-- 2022-03-16 09:21:07,861 INFO Thread-8: Started query
4046d8edde82931b:aa11c84800000000
-- 2022-03-16 09:21:07,875 INFO Thread-9: Started query
594d63b7814c31ab:8f2b92ca00000000
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 1);
-- 2022-03-16 09:21:08,229 INFO Thread-4: Started query
a94dab601c125bb4:8988934300000000
-- executing against localhost:21000
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 2);
-- 2022-03-16 09:21:08,384 INFO Thread-3: Started query
4040597eba7beb36:4cfa08c800000000
-- 2022-03-16 09:21:08,409 INFO Thread-4: Started query
854e1aee63575861:dbf8bafb00000000
-- executing against localhost:21002
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- executing against localhost:21001
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- 2022-03-16 09:21:08,435 INFO Thread-9: Started query
7e4d4cf54a44cf03:868efcb100000000
-- 2022-03-16 09:21:08,456 INFO Thread-8: Started query
4645788cf2a4d401:0aea969e00000000
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 3);
-- 2022-03-16 09:21:08,586 INFO Thread-4: Started query
4e4cdd976dfa3358:6c455b7300000000
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 4);
-- 2022-03-16 09:21:08,711 INFO Thread-4: Started query
6f46942eb8807e5f:ffbb146000000000
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 5);
-- executing against localhost:21000
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- 2022-03-16 09:21:08,959 INFO Thread-3: Started query
d940fa405a776bef:3705218900000000
-- 2022-03-16 09:21:08,977 INFO Thread-4: Started query
f947061a7c45901c:12b2ba9d00000000
-- executing against localhost:21001
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- 2022-03-16 09:21:08,997 INFO Thread-9: Started query
f54a0aa01bbb2f86:3b89ae9600000000
-- executing against localhost:21002
select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
-- 2022-03-16 09:21:09,147 INFO Thread-8: Started query
fa418523bc0552ce:f0e49b1a00000000
-- executing against localhost:21002
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
values (2, 6);
-- 2022-03-16 09:21:09,250 INFO Thread-4: Started query
fc4751d49d1a1aa2:4bcb48d600000000
-- closing connection to: localhost:21002
Traceback (most recent call last):
File "/home/qchen/Impala.03112022/tests/stress/stress_util.py", line 35, in
run
return self.func(*self.args, **self.kwargs)
File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line
276, in _impala_role_concurrent_checker
verify_result_set(result)
File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line
269, in verify_result_set
assert sorted_run == range(sorted_run[0], sorted_run[-1] + 1), "wid: %d" %
wid
AssertionError: wid: 2
assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
At index 3 diff: 2 != 3
Left contains more items, first extra item: 4
Full diff:
- [0, 1, 2, 2, 3, 4]
? ---
+ [0, 1, 2, 3, 4]
========================================== 1 failed, 1 passed in 158.88 seconds
==========================================
[09:23:33 qchen@qifan-10229: Impala.03112022] git branch
IMPALA-10992-auto-scaling-planner-support
* master
[09:23:41 qchen@qifan-10229: Impala.03112022] git diff
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index a921bb961..6df592f3f 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1644,6 +1644,7 @@ public class Frontend {
markTimelineRetries(attempt, retryMsg, timeline);
return req;
} catch (InconsistentMetadataFetchException e) {
+ LOG.error("BAR: catch an InconsistentMetadataFetchException");
if (attempt++ == INCONSISTENT_METADATA_NUM_RETRIES) {
markTimelineRetries(attempt, e.getMessage(), timeline);
throw e;
@@ -1819,14 +1820,18 @@ public class Frontend {
} catch (Exception e) {
if (queryCtx.isSetTransaction_id()) {
try {
+ LOG.error("BAR: to abort Transaction");
abortTransaction(queryCtx.getTransaction_id());
+ LOG.error("BAR: Transaction aborted");
timeline.markEvent("Transaction aborted");
} catch (TransactionException te) {
LOG.error("Could not abort transaction because: " + te.getMessage());
}
} else if (queryCtx.isIs_kudu_transactional()) {
try {
+ LOG.error("BAR: to abort kudu Transaction");
abortKuduTransaction(queryCtx.getQuery_id());
+ LOG.error("BAR: kudu Transaction aborted");
timeline.markEvent(
"Kudu transaction aborted: " +
queryCtx.getQuery_id().toString());
} catch (TransactionException te) {
diff --git a/tests/stress/test_acid_stress.py b/tests/stress/test_acid_stress.py
index f6439ff3c..09ec874e7 100644
--- a/tests/stress/test_acid_stress.py
+++ b/tests/stress/test_acid_stress.py
@@ -46,188 +46,188 @@ class TestAcidStress(ImpalaTestSuite):
v.get_value('table_format').compression_codec == 'none'))
-class TestAcidInsertsBasic(TestAcidStress):
- @classmethod
- def get_workload(self):
- return super(TestAcidInsertsBasic, self).get_workload()
-
- @classmethod
- def add_test_dimensions(cls):
- super(TestAcidInsertsBasic, cls).add_test_dimensions()
-
- def _verify_result(self, result, expected_result):
- """Verify invariants for 'run' and 'i'."""
- assert len(result.data) > 0
- run_max = -1
- i_list = []
- for line in result.data:
- [run, i] = map(int, (line.split('\t')))
- run_max = max(run_max, run)
- i_list.append(i)
- assert expected_result["run"] <= run_max # shouldn't see data overwritten
in the past
- i_list.sort()
- if expected_result["run"] < run_max:
- expected_result["run"] = run_max
- expected_result["i"] = 0
- return
- assert i_list[-1] >= expected_result["i"]
- assert i_list == range(i_list[-1] + 1) # 'i' should have all values from
0 to max_i
- expected_result["i"] = i_list[-1]
-
- def _hive_role_write_inserts(self, tbl_name, partitioned):
- """INSERT INTO/OVERWRITE a table several times from Hive."""
- part_expr = "partition (p=1)" if partitioned else ""
- for run in xrange(0, NUM_OVERWRITES):
- OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
- """ % (tbl_name, part_expr, run, 0)
- self.run_stmt_in_hive(OVERWRITE_SQL)
- for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
- INSERT_SQL = """insert into table %s %s values (%i, %i)
- """ % (tbl_name, part_expr, run, i)
- self.run_stmt_in_hive(INSERT_SQL)
-
- def _impala_role_write_inserts(self, tbl_name, partitioned):
- """INSERT INTO/OVERWRITE a table several times from Impala."""
- try:
- impalad_client = ImpalaTestSuite.create_impala_client()
- part_expr = "partition (p=1)" if partitioned else ""
- for run in xrange(0, NUM_OVERWRITES + 1):
- OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
- """ % (tbl_name, part_expr, run, 0)
- impalad_client.execute(OVERWRITE_SQL)
- for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
- INSERT_SQL = """insert into table %s %s values (%i, %i)
- """ % (tbl_name, part_expr, run, i)
- impalad_client.execute(INSERT_SQL)
- finally:
- impalad_client.close()
-
- def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds):
- """SELECT from a table many times until the expected final values are
found."""
- try:
- impalad_client = ImpalaTestSuite.create_impala_client()
- expected_result = {"run": -1, "i": 0}
- accept_empty_table = True
- while expected_result["run"] != NUM_OVERWRITES and \
- expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
- time.sleep(sleep_seconds)
- if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
- result = impalad_client.execute("select run, i from %s" % tbl_name)
- if len(result.data) == 0:
- assert accept_empty_table
- continue
- accept_empty_table = False
- self._verify_result(result, expected_result)
- finally:
- impalad_client.close()
-
- def _create_table(self, full_tbl_name, partitioned):
- """Creates test table with name 'full_tbl_name'. Table is partitioned if
- 'partitioned' is set to True."""
- part_expr = "partitioned by (p int)" if partitioned else ""
-
- CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
- 'transactional_properties' = 'insert_only', 'transactional' = 'true')
- """ % (full_tbl_name, part_expr)
- self.client.execute("drop table if exists %s" % full_tbl_name)
- self.client.execute(CREATE_SQL)
-
- def _run_test_read_hive_inserts(self, unique_database, partitioned):
- """Check that Impala can read a single insert only ACID table
(over)written by Hive
- several times. Consistency can be checked by using incremental values for
- overwrites ('run') and inserts ('i').
- """
- tbl_name = "%s.test_read_hive_inserts" % unique_database
- self._create_table(tbl_name, partitioned)
-
- run_tasks([
- Task(self._hive_role_write_inserts, tbl_name, partitioned),
- Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
- sleep_seconds=3)])
-
- def _run_test_read_impala_inserts(self, unique_database, partitioned):
- """Check that Impala can read a single insert only ACID table
(over)written by Hive
- several times. Consistency can be checked by using incremental values for
- overwrites ('run') and inserts ('i').
- """
- tbl_name = "%s.test_read_impala_inserts" % unique_database
- self._create_table(tbl_name, partitioned)
-
- run_tasks([
- Task(self._impala_role_write_inserts, tbl_name, partitioned),
- Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
- sleep_seconds=0.1)])
-
- @SkipIfHive2.acid
- @SkipIfS3.hive
- @SkipIfGCS.hive
- @SkipIfCOS.hive
- @pytest.mark.execute_serially
- @pytest.mark.stress
- def test_read_hive_inserts(self, unique_database):
- """Check that Impala can read partitioned and non-partitioned ACID tables
- written by Hive."""
- for is_partitioned in [False, True]:
- self._run_test_read_hive_inserts(unique_database, is_partitioned)
-
- @SkipIfHive2.acid
- @pytest.mark.execute_serially
- @pytest.mark.stress
- def test_read_impala_inserts(self, unique_database):
- """Check that Impala can read partitioned and non-partitioned ACID tables
- written by Hive."""
- for is_partitioned in [False, True]:
- self._run_test_read_impala_inserts(unique_database, is_partitioned)
-
- def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite,
sleep_sec):
- insert_op = "OVERWRITE" if is_overwrite else "INTO"
- try:
- impalad_client = ImpalaTestSuite.create_impala_client()
- impalad_client.execute(
- """insert {op} table {tbl_name} partition({partition})
- select sleep({sleep_ms})""".format(op=insert_op,
tbl_name=tbl_name,
- partition=partition, sleep_ms=sleep_sec * 1000))
- finally:
- impalad_client.close()
-
- @pytest.mark.execute_serially
- @pytest.mark.stress
- @SkipIf.not_hdfs
- @UniqueDatabase.parametrize(sync_ddl=True)
- def test_partitioned_inserts(self, unique_database):
- """Check that the different ACID write operations take appropriate locks.
- INSERT INTO: should take a shared lock
- INSERT OVERWRITE: should take an exclusive lock
- Both should take PARTITION-level lock in case of static partition
insert."""
- tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
- self.client.set_configuration_option("SYNC_DDL", "true")
- self.client.execute("""
- CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
- TBLPROPERTIES(
-
'transactional_properties'='insert_only','transactional'='true')""".format(
- tbl_name))
- # Warmup INSERT
- self.execute_query("alter table {0} add
partition(p=0,q=0)".format(tbl_name))
- sleep_sec = 5
- task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
- "p=1,q=1", False, sleep_sec)
- # INSERT INTO the same partition can run in parallel.
- duration = run_tasks([task_insert_into, task_insert_into])
- assert duration < 3 * sleep_sec
- task_insert_overwrite = Task(self._impala_role_partition_writer, tbl_name,
- "p=1,q=1", True, sleep_sec)
- # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
- duration = run_tasks([task_insert_into, task_insert_overwrite])
- assert duration > 4 * sleep_sec
- # INSERT OVERWRITEs to the same partition should have mutual exclusion.
- duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
- assert duration > 4 * sleep_sec
- task_insert_overwrite_2 = Task(self._impala_role_partition_writer,
tbl_name,
- "p=1,q=2", True, sleep_sec)
- # INSERT OVERWRITEs to different partitions can run in parallel.
- duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
- assert duration < 3 * sleep_sec
-
+#class TestAcidInsertsBasic(TestAcidStress):
+# @classmethod
+# def get_workload(self):
+# return super(TestAcidInsertsBasic, self).get_workload()
+#
+# @classmethod
+# def add_test_dimensions(cls):
+# super(TestAcidInsertsBasic, cls).add_test_dimensions()
+#
+# def _verify_result(self, result, expected_result):
+# """Verify invariants for 'run' and 'i'."""
+# assert len(result.data) > 0
+# run_max = -1
+# i_list = []
+# for line in result.data:
+# [run, i] = map(int, (line.split('\t')))
+# run_max = max(run_max, run)
+# i_list.append(i)
+# assert expected_result["run"] <= run_max # shouldn't see data
overwritten in the past
+# i_list.sort()
+# if expected_result["run"] < run_max:
+# expected_result["run"] = run_max
+# expected_result["i"] = 0
+# return
+# assert i_list[-1] >= expected_result["i"]
+# assert i_list == range(i_list[-1] + 1) # 'i' should have all values from
0 to max_i
+# expected_result["i"] = i_list[-1]
+#
+# def _hive_role_write_inserts(self, tbl_name, partitioned):
+# """INSERT INTO/OVERWRITE a table several times from Hive."""
+# part_expr = "partition (p=1)" if partitioned else ""
+# for run in xrange(0, NUM_OVERWRITES):
+# OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
+# """ % (tbl_name, part_expr, run, 0)
+# self.run_stmt_in_hive(OVERWRITE_SQL)
+# for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
+# INSERT_SQL = """insert into table %s %s values (%i, %i)
+# """ % (tbl_name, part_expr, run, i)
+# self.run_stmt_in_hive(INSERT_SQL)
+#
+# def _impala_role_write_inserts(self, tbl_name, partitioned):
+# """INSERT INTO/OVERWRITE a table several times from Impala."""
+# try:
+# impalad_client = ImpalaTestSuite.create_impala_client()
+# part_expr = "partition (p=1)" if partitioned else ""
+# for run in xrange(0, NUM_OVERWRITES + 1):
+# OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
+# """ % (tbl_name, part_expr, run, 0)
+# impalad_client.execute(OVERWRITE_SQL)
+# for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
+# INSERT_SQL = """insert into table %s %s values (%i, %i)
+# """ % (tbl_name, part_expr, run, i)
+# impalad_client.execute(INSERT_SQL)
+# finally:
+# impalad_client.close()
+#
+# def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds):
+# """SELECT from a table many times until the expected final values are
found."""
+# try:
+# impalad_client = ImpalaTestSuite.create_impala_client()
+# expected_result = {"run": -1, "i": 0}
+# accept_empty_table = True
+# while expected_result["run"] != NUM_OVERWRITES and \
+# expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
+# time.sleep(sleep_seconds)
+# if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
+# result = impalad_client.execute("select run, i from %s" % tbl_name)
+# if len(result.data) == 0:
+# assert accept_empty_table
+# continue
+# accept_empty_table = False
+# self._verify_result(result, expected_result)
+# finally:
+# impalad_client.close()
+#
+# def _create_table(self, full_tbl_name, partitioned):
+# """Creates test table with name 'full_tbl_name'. Table is partitioned if
+# 'partitioned' is set to True."""
+# part_expr = "partitioned by (p int)" if partitioned else ""
+#
+# CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
+# 'transactional_properties' = 'insert_only', 'transactional' = 'true')
+# """ % (full_tbl_name, part_expr)
+# self.client.execute("drop table if exists %s" % full_tbl_name)
+# self.client.execute(CREATE_SQL)
+#
+# def _run_test_read_hive_inserts(self, unique_database, partitioned):
+# """Check that Impala can read a single insert only ACID table
(over)written by Hive
+# several times. Consistency can be checked by using incremental values for
+# overwrites ('run') and inserts ('i').
+# """
+# tbl_name = "%s.test_read_hive_inserts" % unique_database
+# self._create_table(tbl_name, partitioned)
+#
+# run_tasks([
+# Task(self._hive_role_write_inserts, tbl_name, partitioned),
+# Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
+# sleep_seconds=3)])
+#
+# def _run_test_read_impala_inserts(self, unique_database, partitioned):
+# """Check that Impala can read a single insert only ACID table
(over)written by Hive
+# several times. Consistency can be checked by using incremental values for
+# overwrites ('run') and inserts ('i').
+# """
+# tbl_name = "%s.test_read_impala_inserts" % unique_database
+# self._create_table(tbl_name, partitioned)
+#
+# run_tasks([
+# Task(self._impala_role_write_inserts, tbl_name, partitioned),
+# Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
+# sleep_seconds=0.1)])
+#
+# @SkipIfHive2.acid
+# @SkipIfS3.hive
+# @SkipIfGCS.hive
+# @SkipIfCOS.hive
+# @pytest.mark.execute_serially
+# @pytest.mark.stress
+# def test_read_hive_inserts(self, unique_database):
+# """Check that Impala can read partitioned and non-partitioned ACID tables
+# written by Hive."""
+# for is_partitioned in [False, True]:
+# self._run_test_read_hive_inserts(unique_database, is_partitioned)
+#
+# @SkipIfHive2.acid
+# @pytest.mark.execute_serially
+# @pytest.mark.stress
+# def test_read_impala_inserts(self, unique_database):
+# """Check that Impala can read partitioned and non-partitioned ACID tables
+# written by Hive."""
+# for is_partitioned in [False, True]:
+# self._run_test_read_impala_inserts(unique_database, is_partitioned)
+#
+# def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite,
sleep_sec):
+# insert_op = "OVERWRITE" if is_overwrite else "INTO"
+# try:
+# impalad_client = ImpalaTestSuite.create_impala_client()
+# impalad_client.execute(
+# """insert {op} table {tbl_name} partition({partition})
+# select sleep({sleep_ms})""".format(op=insert_op,
tbl_name=tbl_name,
+# partition=partition, sleep_ms=sleep_sec * 1000))
+# finally:
+# impalad_client.close()
+#
+# @pytest.mark.execute_serially
+# @pytest.mark.stress
+# @SkipIf.not_hdfs
+# @UniqueDatabase.parametrize(sync_ddl=True)
+# def test_partitioned_inserts(self, unique_database):
+# """Check that the different ACID write operations take appropriate locks.
+# INSERT INTO: should take a shared lock
+# INSERT OVERWRITE: should take an exclusive lock
+# Both should take PARTITION-level lock in case of static partition
insert."""
+# tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
+# self.client.set_configuration_option("SYNC_DDL", "true")
+# self.client.execute("""
+# CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
+# TBLPROPERTIES(
+#
'transactional_properties'='insert_only','transactional'='true')""".format(
+# tbl_name))
+# # Warmup INSERT
+# self.execute_query("alter table {0} add
partition(p=0,q=0)".format(tbl_name))
+# sleep_sec = 5
+# task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
+# "p=1,q=1", False, sleep_sec)
+# # INSERT INTO the same partition can run in parallel.
+# duration = run_tasks([task_insert_into, task_insert_into])
+# assert duration < 3 * sleep_sec
+# task_insert_overwrite = Task(self._impala_role_partition_writer, tbl_name,
+# "p=1,q=1", True, sleep_sec)
+# # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
+# duration = run_tasks([task_insert_into, task_insert_overwrite])
+# assert duration > 4 * sleep_sec
+# # INSERT OVERWRITEs to the same partition should have mutual exclusion.
+# duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
+# assert duration > 4 * sleep_sec
+# task_insert_overwrite_2 = Task(self._impala_role_partition_writer,
tbl_name,
+# "p=1,q=2", True, sleep_sec)
+# # INSERT OVERWRITEs to different partitions can run in parallel.
+# duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
+# assert duration < 3 * sleep_sec
+#
class TestConcurrentAcidInserts(TestAcidStress):
@classmethod
(END)
{code}
Summary: Concurrent insert ACID tests are broken in local catalog mode
(was: Concurrent insert ACL tests are broken in local catalog mode)
> Concurrent insert ACID tests are broken in local catalog mode
> -------------------------------------------------------------
>
> Key: IMPALA-11189
> URL: https://issues.apache.org/jira/browse/IMPALA-11189
> Project: IMPALA
> Issue Type: Bug
> Components: Catalog
> Reporter: Qifan Chen
> Priority: Major
>
> Stress test test_concurrent_inserts (in tests/stress/test_acid_stress.py) can
> fail repeatedly in local catalog mode. In this case, the concurrent checker
> query (select * from <table>) returns duplicated rows such as reported below,
> where row [0,2] is duplicated.
> The failure can be reproduced quite easily by running the test (i.e.,
> TestConcurrentAcidInserts) first, via commenting out all the tests prior to
> it in the test file tests/stress/test_acid_stress.py.
> Setup:
> 1. Build the impala and clear HMS in case in a bad state:
> $IMPALA_HOME/buildall.sh -format_metastore -notests
> 2. Start the cluster in local catalog mode:
> $IMPALA_HOME/bin/start-impala-cluster.py --impalad_args
> --use_local_catalog=true --catalogd_args --catalog_topic_mode=minimal
> --catalogd_args --hms_event_polling_interval_s=1
> 3. Run the modified stress test: $IMPALA_HOME/bin/impala-py.test
> $IMPALA_TESTS/stress/test_acid_stress.py
> Error reported:
> {code:java}
> 09:11:00 qchen@qifan-10229: Impala.03112022] test_acid_stress
> rootLoggerLevel = INFO
> ================================================== test session starts
> ===================================================
> platform linux2 -- Python 2.7.16, pytest-2.9.2, py-1.4.32, pluggy-0.3.1 --
> /home/qchen/Impala.03112022/infra/python/env-gcc7.5.0/bin/python
> cachedir: tests/.cache
> rootdir: /home/qchen/Impala.03112022/tests, inifile: pytest.ini
> plugins: xdist-1.17.1, timeout-1.2.1, random-0.2, forked-0.2
> timeout: 7200s method: signal
> collected 2 items
> tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::test_concurrent_inserts[unique_database0]
> FAILED
> tests/stress/test_acid_stress.py::TestFailingAcidInserts::test_failing_inserts[unique_database0]
> PASSED
> ================================================ short test summary info
> =================================================
> FAIL
> tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]
> ======================================================== FAILURES
> ========================================================
> __________________________
> TestConcurrentAcidInserts.test_concurrent_inserts[unique_database0]
> ___________________________
> tests/stress/test_acid_stress.py:307: in test_concurrent_inserts
> run_tasks(writers + checkers)
> tests/stress/stress_util.py:45: in run_tasks
> pool.map_async(Task.run, tasks).get(timeout_seconds)
> ../Impala.03082022/toolchain/toolchain-packages-gcc7.5.0/python-2.7.16/lib/python2.7/multiprocessing/pool.py:572:
> in get
> raise self._value
> E AssertionError: wid: 2
> E assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
> E At index 3 diff: 2 != 3
> E Left contains more items, first extra item: 4
> E Full diff:
> E - [0, 1, 2, 2, 3, 4]
> E ? ---
> E + [0, 1, 2, 3, 4]
> ------------------------------------------------- Captured stderr setup
> --------------------------------------------------
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21000
> -- connecting to localhost:21050 with impyla
> -- 2022-03-16 09:20:54,762 INFO MainThread: Closing active operation
> -- connecting to localhost:28000 with impyla
> -- 2022-03-16 09:20:54,774 INFO MainThread: Closing active operation
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET sync_ddl=True;
> -- executing against localhost:21000
> DROP DATABASE IF EXISTS `test_concurrent_inserts_8933345c` CASCADE;
> -- 2022-03-16 09:20:54,808 INFO MainThread: Started query
> 28457f4c7e77cdec:c6d3731900000000
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET sync_ddl=True;
> -- executing against localhost:21000
> CREATE DATABASE `test_concurrent_inserts_8933345c`;
> -- 2022-03-16 09:20:54,877 INFO MainThread: Started query
> 374bf99aea680523:48d2405400000000
> -- 2022-03-16 09:21:01,164 INFO MainThread: Created database
> "test_concurrent_inserts_8933345c" for test ID
> "stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]"
> -------------------------------------------------- Captured stderr call
> --------------------------------------------------
> SET SYNC_DDL=true;
> -- executing against localhost:21000
> drop table if exists test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:01,173 INFO MainThread: Started query
> 20480c2a1d336d35:c2d84edd00000000
> -- executing against localhost:21000
> create table test_concurrent_inserts_8933345c.test_concurrent_inserts (wid
> int, i int) TBLPROPERTIES (
> 'transactional_properties' = 'insert_only', 'transactional' = 'true')
> ;
> -- 2022-03-16 09:21:01,294 INFO MainThread: Started query
> 754969473483b4e9:acfc852300000000
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21000
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21001
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- executing against localhost:21000
> -- connecting to: localhost:21000
> -- connecting to: localhost:21002
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21001
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21002
> -- connecting to: localhost:21000
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (0, 0);
> -- executing against localhost:21001
> -- connecting to: localhost:21002
> -- executing against localhost:21002
> -- executing against localhost:21000
> -- connecting to: localhost:21001
> -- executing against localhost:21001
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (1, 0);
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 0);
> -- executing against localhost:21000
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (3, 0);
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (4, 0);
> -- executing against localhost:21001
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (5, 0);
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:07,824 INFO Thread-3: Started query
> 6f4e9d38d1723252:a6a7368f00000000
> -- 2022-03-16 09:21:07,859 INFO Thread-4: Started query
> 33423e0145b7d3e0:ea8d626200000000
> -- 2022-03-16 09:21:07,861 INFO Thread-8: Started query
> 4046d8edde82931b:aa11c84800000000
> -- 2022-03-16 09:21:07,875 INFO Thread-9: Started query
> 594d63b7814c31ab:8f2b92ca00000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 1);
> -- 2022-03-16 09:21:08,229 INFO Thread-4: Started query
> a94dab601c125bb4:8988934300000000
> -- executing against localhost:21000
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 2);
> -- 2022-03-16 09:21:08,384 INFO Thread-3: Started query
> 4040597eba7beb36:4cfa08c800000000
> -- 2022-03-16 09:21:08,409 INFO Thread-4: Started query
> 854e1aee63575861:dbf8bafb00000000
> -- executing against localhost:21002
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- executing against localhost:21001
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:08,435 INFO Thread-9: Started query
> 7e4d4cf54a44cf03:868efcb100000000
> -- 2022-03-16 09:21:08,456 INFO Thread-8: Started query
> 4645788cf2a4d401:0aea969e00000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 3);
> -- 2022-03-16 09:21:08,586 INFO Thread-4: Started query
> 4e4cdd976dfa3358:6c455b7300000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 4);
> -- 2022-03-16 09:21:08,711 INFO Thread-4: Started query
> 6f46942eb8807e5f:ffbb146000000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 5);
> -- executing against localhost:21000
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:08,959 INFO Thread-3: Started query
> d940fa405a776bef:3705218900000000
> -- 2022-03-16 09:21:08,977 INFO Thread-4: Started query
> f947061a7c45901c:12b2ba9d00000000
> -- executing against localhost:21001
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:08,997 INFO Thread-9: Started query
> f54a0aa01bbb2f86:3b89ae9600000000
> -- executing against localhost:21002
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:09,147 INFO Thread-8: Started query
> fa418523bc0552ce:f0e49b1a00000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 6);
> -- 2022-03-16 09:21:09,250 INFO Thread-4: Started query
> fc4751d49d1a1aa2:4bcb48d600000000
> -- closing connection to: localhost:21002
> Traceback (most recent call last):
> File "/home/qchen/Impala.03112022/tests/stress/stress_util.py", line 35, in
> run
> return self.func(*self.args, **self.kwargs)
> File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line
> 276, in _impala_role_concurrent_checker
> verify_result_set(result)
> File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line
> 269, in verify_result_set
> assert sorted_run == range(sorted_run[0], sorted_run[-1] + 1), "wid: %d"
> % wid
> AssertionError: wid: 2
> assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
> At index 3 diff: 2 != 3
> Left contains more items, first extra item: 4
> Full diff:
> - [0, 1, 2, 2, 3, 4]
> ? ---
> + [0, 1, 2, 3, 4]
> ========================================== 1 failed, 1 passed in 158.88
> seconds ==========================================
> [09:23:33 qchen@qifan-10229: Impala.03112022] git branch
> IMPALA-10992-auto-scaling-planner-support
> * master
> [09:23:41 qchen@qifan-10229: Impala.03112022] git diff
> diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
> b/fe/src/main/java/org/apache/impala/service/Frontend.java
> index a921bb961..6df592f3f 100644
> --- a/fe/src/main/java/org/apache/impala/service/Frontend.java
> +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
> @@ -1644,6 +1644,7 @@ public class Frontend {
> markTimelineRetries(attempt, retryMsg, timeline);
> return req;
> } catch (InconsistentMetadataFetchException e) {
> + LOG.error("BAR: catch an InconsistentMetadataFetchException");
> if (attempt++ == INCONSISTENT_METADATA_NUM_RETRIES) {
> markTimelineRetries(attempt, e.getMessage(), timeline);
> throw e;
> @@ -1819,14 +1820,18 @@ public class Frontend {
> } catch (Exception e) {
> if (queryCtx.isSetTransaction_id()) {
> try {
> + LOG.error("BAR: to abort Transaction");
> abortTransaction(queryCtx.getTransaction_id());
> + LOG.error("BAR: Transaction aborted");
> timeline.markEvent("Transaction aborted");
> } catch (TransactionException te) {
> LOG.error("Could not abort transaction because: " +
> te.getMessage());
> }
> } else if (queryCtx.isIs_kudu_transactional()) {
> try {
> + LOG.error("BAR: to abort kudu Transaction");
> abortKuduTransaction(queryCtx.getQuery_id());
> + LOG.error("BAR: kudu Transaction aborted");
> timeline.markEvent(
> "Kudu transaction aborted: " +
> queryCtx.getQuery_id().toString());
> } catch (TransactionException te) {
> diff --git a/tests/stress/test_acid_stress.py
> b/tests/stress/test_acid_stress.py
> index f6439ff3c..09ec874e7 100644
> --- a/tests/stress/test_acid_stress.py
> +++ b/tests/stress/test_acid_stress.py
> @@ -46,188 +46,188 @@ class TestAcidStress(ImpalaTestSuite):
> v.get_value('table_format').compression_codec == 'none'))
>
>
> -class TestAcidInsertsBasic(TestAcidStress):
> - @classmethod
> - def get_workload(self):
> - return super(TestAcidInsertsBasic, self).get_workload()
> -
> - @classmethod
> - def add_test_dimensions(cls):
> - super(TestAcidInsertsBasic, cls).add_test_dimensions()
> -
> - def _verify_result(self, result, expected_result):
> - """Verify invariants for 'run' and 'i'."""
> - assert len(result.data) > 0
> - run_max = -1
> - i_list = []
> - for line in result.data:
> - [run, i] = map(int, (line.split('\t')))
> - run_max = max(run_max, run)
> - i_list.append(i)
> - assert expected_result["run"] <= run_max # shouldn't see data
> overwritten in the past
> - i_list.sort()
> - if expected_result["run"] < run_max:
> - expected_result["run"] = run_max
> - expected_result["i"] = 0
> - return
> - assert i_list[-1] >= expected_result["i"]
> - assert i_list == range(i_list[-1] + 1) # 'i' should have all values
> from 0 to max_i
> - expected_result["i"] = i_list[-1]
> -
> - def _hive_role_write_inserts(self, tbl_name, partitioned):
> - """INSERT INTO/OVERWRITE a table several times from Hive."""
> - part_expr = "partition (p=1)" if partitioned else ""
> - for run in xrange(0, NUM_OVERWRITES):
> - OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> - """ % (tbl_name, part_expr, run, 0)
> - self.run_stmt_in_hive(OVERWRITE_SQL)
> - for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> - INSERT_SQL = """insert into table %s %s values (%i, %i)
> - """ % (tbl_name, part_expr, run, i)
> - self.run_stmt_in_hive(INSERT_SQL)
> -
> - def _impala_role_write_inserts(self, tbl_name, partitioned):
> - """INSERT INTO/OVERWRITE a table several times from Impala."""
> - try:
> - impalad_client = ImpalaTestSuite.create_impala_client()
> - part_expr = "partition (p=1)" if partitioned else ""
> - for run in xrange(0, NUM_OVERWRITES + 1):
> - OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> - """ % (tbl_name, part_expr, run, 0)
> - impalad_client.execute(OVERWRITE_SQL)
> - for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> - INSERT_SQL = """insert into table %s %s values (%i, %i)
> - """ % (tbl_name, part_expr, run, i)
> - impalad_client.execute(INSERT_SQL)
> - finally:
> - impalad_client.close()
> -
> - def _impala_role_read_inserts(self, tbl_name, needs_refresh,
> sleep_seconds):
> - """SELECT from a table many times until the expected final values are
> found."""
> - try:
> - impalad_client = ImpalaTestSuite.create_impala_client()
> - expected_result = {"run": -1, "i": 0}
> - accept_empty_table = True
> - while expected_result["run"] != NUM_OVERWRITES and \
> - expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
> - time.sleep(sleep_seconds)
> - if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
> - result = impalad_client.execute("select run, i from %s" % tbl_name)
> - if len(result.data) == 0:
> - assert accept_empty_table
> - continue
> - accept_empty_table = False
> - self._verify_result(result, expected_result)
> - finally:
> - impalad_client.close()
> -
> - def _create_table(self, full_tbl_name, partitioned):
> - """Creates test table with name 'full_tbl_name'. Table is partitioned if
> - 'partitioned' is set to True."""
> - part_expr = "partitioned by (p int)" if partitioned else ""
> -
> - CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
> - 'transactional_properties' = 'insert_only', 'transactional' =
> 'true')
> - """ % (full_tbl_name, part_expr)
> - self.client.execute("drop table if exists %s" % full_tbl_name)
> - self.client.execute(CREATE_SQL)
> -
> - def _run_test_read_hive_inserts(self, unique_database, partitioned):
> - """Check that Impala can read a single insert only ACID table
> (over)written by Hive
> - several times. Consistency can be checked by using incremental values for
> - overwrites ('run') and inserts ('i').
> - """
> - tbl_name = "%s.test_read_hive_inserts" % unique_database
> - self._create_table(tbl_name, partitioned)
> -
> - run_tasks([
> - Task(self._hive_role_write_inserts, tbl_name, partitioned),
> - Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
> - sleep_seconds=3)])
> -
> - def _run_test_read_impala_inserts(self, unique_database, partitioned):
> - """Check that Impala can read a single insert only ACID table
> (over)written by Hive
> - several times. Consistency can be checked by using incremental values for
> - overwrites ('run') and inserts ('i').
> - """
> - tbl_name = "%s.test_read_impala_inserts" % unique_database
> - self._create_table(tbl_name, partitioned)
> -
> - run_tasks([
> - Task(self._impala_role_write_inserts, tbl_name, partitioned),
> - Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
> - sleep_seconds=0.1)])
> -
> - @SkipIfHive2.acid
> - @SkipIfS3.hive
> - @SkipIfGCS.hive
> - @SkipIfCOS.hive
> - @pytest.mark.execute_serially
> - @pytest.mark.stress
> - def test_read_hive_inserts(self, unique_database):
> - """Check that Impala can read partitioned and non-partitioned ACID tables
> - written by Hive."""
> - for is_partitioned in [False, True]:
> - self._run_test_read_hive_inserts(unique_database, is_partitioned)
> -
> - @SkipIfHive2.acid
> - @pytest.mark.execute_serially
> - @pytest.mark.stress
> - def test_read_impala_inserts(self, unique_database):
> - """Check that Impala can read partitioned and non-partitioned ACID tables
> - written by Hive."""
> - for is_partitioned in [False, True]:
> - self._run_test_read_impala_inserts(unique_database, is_partitioned)
> -
> - def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite,
> sleep_sec):
> - insert_op = "OVERWRITE" if is_overwrite else "INTO"
> - try:
> - impalad_client = ImpalaTestSuite.create_impala_client()
> - impalad_client.execute(
> - """insert {op} table {tbl_name} partition({partition})
> - select sleep({sleep_ms})""".format(op=insert_op,
> tbl_name=tbl_name,
> - partition=partition, sleep_ms=sleep_sec * 1000))
> - finally:
> - impalad_client.close()
> -
> - @pytest.mark.execute_serially
> - @pytest.mark.stress
> - @SkipIf.not_hdfs
> - @UniqueDatabase.parametrize(sync_ddl=True)
> - def test_partitioned_inserts(self, unique_database):
> - """Check that the different ACID write operations take appropriate locks.
> - INSERT INTO: should take a shared lock
> - INSERT OVERWRITE: should take an exclusive lock
> - Both should take PARTITION-level lock in case of static partition
> insert."""
> - tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
> - self.client.set_configuration_option("SYNC_DDL", "true")
> - self.client.execute("""
> - CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
> - TBLPROPERTIES(
> -
> 'transactional_properties'='insert_only','transactional'='true')""".format(
> - tbl_name))
> - # Warmup INSERT
> - self.execute_query("alter table {0} add
> partition(p=0,q=0)".format(tbl_name))
> - sleep_sec = 5
> - task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
> - "p=1,q=1", False, sleep_sec)
> - # INSERT INTO the same partition can run in parallel.
> - duration = run_tasks([task_insert_into, task_insert_into])
> - assert duration < 3 * sleep_sec
> - task_insert_overwrite = Task(self._impala_role_partition_writer,
> tbl_name,
> - "p=1,q=1", True, sleep_sec)
> - # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
> - duration = run_tasks([task_insert_into, task_insert_overwrite])
> - assert duration > 4 * sleep_sec
> - # INSERT OVERWRITEs to the same partition should have mutual exclusion.
> - duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
> - assert duration > 4 * sleep_sec
> - task_insert_overwrite_2 = Task(self._impala_role_partition_writer,
> tbl_name,
> - "p=1,q=2", True, sleep_sec)
> - # INSERT OVERWRITEs to different partitions can run in parallel.
> - duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
> - assert duration < 3 * sleep_sec
> -
> +#class TestAcidInsertsBasic(TestAcidStress):
> +# @classmethod
> +# def get_workload(self):
> +# return super(TestAcidInsertsBasic, self).get_workload()
> +#
> +# @classmethod
> +# def add_test_dimensions(cls):
> +# super(TestAcidInsertsBasic, cls).add_test_dimensions()
> +#
> +# def _verify_result(self, result, expected_result):
> +# """Verify invariants for 'run' and 'i'."""
> +# assert len(result.data) > 0
> +# run_max = -1
> +# i_list = []
> +# for line in result.data:
> +# [run, i] = map(int, (line.split('\t')))
> +# run_max = max(run_max, run)
> +# i_list.append(i)
> +# assert expected_result["run"] <= run_max # shouldn't see data
> overwritten in the past
> +# i_list.sort()
> +# if expected_result["run"] < run_max:
> +# expected_result["run"] = run_max
> +# expected_result["i"] = 0
> +# return
> +# assert i_list[-1] >= expected_result["i"]
> +# assert i_list == range(i_list[-1] + 1) # 'i' should have all values
> from 0 to max_i
> +# expected_result["i"] = i_list[-1]
> +#
> +# def _hive_role_write_inserts(self, tbl_name, partitioned):
> +# """INSERT INTO/OVERWRITE a table several times from Hive."""
> +# part_expr = "partition (p=1)" if partitioned else ""
> +# for run in xrange(0, NUM_OVERWRITES):
> +# OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> +# """ % (tbl_name, part_expr, run, 0)
> +# self.run_stmt_in_hive(OVERWRITE_SQL)
> +# for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> +# INSERT_SQL = """insert into table %s %s values (%i, %i)
> +# """ % (tbl_name, part_expr, run, i)
> +# self.run_stmt_in_hive(INSERT_SQL)
> +#
> +# def _impala_role_write_inserts(self, tbl_name, partitioned):
> +# """INSERT INTO/OVERWRITE a table several times from Impala."""
> +# try:
> +# impalad_client = ImpalaTestSuite.create_impala_client()
> +# part_expr = "partition (p=1)" if partitioned else ""
> +# for run in xrange(0, NUM_OVERWRITES + 1):
> +# OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> +# """ % (tbl_name, part_expr, run, 0)
> +# impalad_client.execute(OVERWRITE_SQL)
> +# for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> +# INSERT_SQL = """insert into table %s %s values (%i, %i)
> +# """ % (tbl_name, part_expr, run, i)
> +# impalad_client.execute(INSERT_SQL)
> +# finally:
> +# impalad_client.close()
> +#
> +# def _impala_role_read_inserts(self, tbl_name, needs_refresh,
> sleep_seconds):
> +# """SELECT from a table many times until the expected final values are
> found."""
> +# try:
> +# impalad_client = ImpalaTestSuite.create_impala_client()
> +# expected_result = {"run": -1, "i": 0}
> +# accept_empty_table = True
> +# while expected_result["run"] != NUM_OVERWRITES and \
> +# expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
> +# time.sleep(sleep_seconds)
> +# if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
> +# result = impalad_client.execute("select run, i from %s" % tbl_name)
> +# if len(result.data) == 0:
> +# assert accept_empty_table
> +# continue
> +# accept_empty_table = False
> +# self._verify_result(result, expected_result)
> +# finally:
> +# impalad_client.close()
> +#
> +# def _create_table(self, full_tbl_name, partitioned):
> +# """Creates test table with name 'full_tbl_name'. Table is partitioned if
> +# 'partitioned' is set to True."""
> +# part_expr = "partitioned by (p int)" if partitioned else ""
> +#
> +# CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
> +# 'transactional_properties' = 'insert_only', 'transactional' =
> 'true')
> +# """ % (full_tbl_name, part_expr)
> +# self.client.execute("drop table if exists %s" % full_tbl_name)
> +# self.client.execute(CREATE_SQL)
> +#
> +# def _run_test_read_hive_inserts(self, unique_database, partitioned):
> +# """Check that Impala can read a single insert only ACID table
> (over)written by Hive
> +# several times. Consistency can be checked by using incremental values
> for
> +# overwrites ('run') and inserts ('i').
> +# """
> +# tbl_name = "%s.test_read_hive_inserts" % unique_database
> +# self._create_table(tbl_name, partitioned)
> +#
> +# run_tasks([
> +# Task(self._hive_role_write_inserts, tbl_name, partitioned),
> +# Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
> +# sleep_seconds=3)])
> +#
> +# def _run_test_read_impala_inserts(self, unique_database, partitioned):
> +# """Check that Impala can read a single insert only ACID table
> (over)written by Hive
> +# several times. Consistency can be checked by using incremental values
> for
> +# overwrites ('run') and inserts ('i').
> +# """
> +# tbl_name = "%s.test_read_impala_inserts" % unique_database
> +# self._create_table(tbl_name, partitioned)
> +#
> +# run_tasks([
> +# Task(self._impala_role_write_inserts, tbl_name, partitioned),
> +# Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
> +# sleep_seconds=0.1)])
> +#
> +# @SkipIfHive2.acid
> +# @SkipIfS3.hive
> +# @SkipIfGCS.hive
> +# @SkipIfCOS.hive
> +# @pytest.mark.execute_serially
> +# @pytest.mark.stress
> +# def test_read_hive_inserts(self, unique_database):
> +# """Check that Impala can read partitioned and non-partitioned ACID
> tables
> +# written by Hive."""
> +# for is_partitioned in [False, True]:
> +# self._run_test_read_hive_inserts(unique_database, is_partitioned)
> +#
> +# @SkipIfHive2.acid
> +# @pytest.mark.execute_serially
> +# @pytest.mark.stress
> +# def test_read_impala_inserts(self, unique_database):
> +# """Check that Impala can read partitioned and non-partitioned ACID
> tables
> +# written by Hive."""
> +# for is_partitioned in [False, True]:
> +# self._run_test_read_impala_inserts(unique_database, is_partitioned)
> +#
> +# def _impala_role_partition_writer(self, tbl_name, partition,
> is_overwrite, sleep_sec):
> +# insert_op = "OVERWRITE" if is_overwrite else "INTO"
> +# try:
> +# impalad_client = ImpalaTestSuite.create_impala_client()
> +# impalad_client.execute(
> +# """insert {op} table {tbl_name} partition({partition})
> +# select sleep({sleep_ms})""".format(op=insert_op,
> tbl_name=tbl_name,
> +# partition=partition, sleep_ms=sleep_sec * 1000))
> +# finally:
> +# impalad_client.close()
> +#
> +# @pytest.mark.execute_serially
> +# @pytest.mark.stress
> +# @SkipIf.not_hdfs
> +# @UniqueDatabase.parametrize(sync_ddl=True)
> +# def test_partitioned_inserts(self, unique_database):
> +# """Check that the different ACID write operations take appropriate
> locks.
> +# INSERT INTO: should take a shared lock
> +# INSERT OVERWRITE: should take an exclusive lock
> +# Both should take PARTITION-level lock in case of static partition
> insert."""
> +# tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
> +# self.client.set_configuration_option("SYNC_DDL", "true")
> +# self.client.execute("""
> +# CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
> +# TBLPROPERTIES(
> +#
> 'transactional_properties'='insert_only','transactional'='true')""".format(
> +# tbl_name))
> +# # Warmup INSERT
> +# self.execute_query("alter table {0} add
> partition(p=0,q=0)".format(tbl_name))
> +# sleep_sec = 5
> +# task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
> +# "p=1,q=1", False, sleep_sec)
> +# # INSERT INTO the same partition can run in parallel.
> +# duration = run_tasks([task_insert_into, task_insert_into])
> +# assert duration < 3 * sleep_sec
> +# task_insert_overwrite = Task(self._impala_role_partition_writer,
> tbl_name,
> +# "p=1,q=1", True, sleep_sec)
> +# # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
> +# duration = run_tasks([task_insert_into, task_insert_overwrite])
> +# assert duration > 4 * sleep_sec
> +# # INSERT OVERWRITEs to the same partition should have mutual exclusion.
> +# duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
> +# assert duration > 4 * sleep_sec
> +# task_insert_overwrite_2 = Task(self._impala_role_partition_writer,
> tbl_name,
> +# "p=1,q=2", True, sleep_sec)
> +# # INSERT OVERWRITEs to different partitions can run in parallel.
> +# duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
> +# assert duration < 3 * sleep_sec
> +#
>
> class TestConcurrentAcidInserts(TestAcidStress):
> @classmethod
> (END)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]