[
https://issues.apache.org/jira/browse/IMPALA-11189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17509140#comment-17509140
]
Quanlong Huang commented on IMPALA-11189:
-----------------------------------------
Maybe I'm missing something, but I can't reproduce the issue.
Could you upload the log files of catalogd and impalads? [~sql_forever]
> Concurrent insert ACID tests are broken in local catalog mode
> -------------------------------------------------------------
>
> Key: IMPALA-11189
> URL: https://issues.apache.org/jira/browse/IMPALA-11189
> Project: IMPALA
> Issue Type: Bug
> Components: Catalog
> Reporter: Qifan Chen
> Priority: Major
>
> Stress test test_concurrent_inserts (in tests/stress/test_acid_stress.py) can
> fail repeatedly in local catalog mode. In this case, the concurrent checker
> query (select * from <table>) returns duplicated rows such as reported below,
> where row [0,2] is duplicated.
> The failure can be reproduced quite easily by running the test (i.e.,
> TestConcurrentAcidInserts) first, via commenting out all the tests prior to
> it in the test file tests/stress/test_acid_stress.py.
> Setup:
> 1. Build the impala and clear HMS in case in a bad state:
> $IMPALA_HOME/buildall.sh -format_metastore -notests
> 2. Start the cluster in local catalog mode:
> $IMPALA_HOME/bin/start-impala-cluster.py --impalad_args
> --use_local_catalog=true --catalogd_args --catalog_topic_mode=minimal
> --catalogd_args --hms_event_polling_interval_s=1
> 3. Run the modified stress test: $IMPALA_HOME/bin/impala-py.test
> $IMPALA_TESTS/stress/test_acid_stress.py
> Error reported:
> {code:java}
> 09:11:00 qchen@qifan-10229: Impala.03112022] test_acid_stress
> rootLoggerLevel = INFO
> ================================================== test session starts
> ===================================================
> platform linux2 -- Python 2.7.16, pytest-2.9.2, py-1.4.32, pluggy-0.3.1 --
> /home/qchen/Impala.03112022/infra/python/env-gcc7.5.0/bin/python
> cachedir: tests/.cache
> rootdir: /home/qchen/Impala.03112022/tests, inifile: pytest.ini
> plugins: xdist-1.17.1, timeout-1.2.1, random-0.2, forked-0.2
> timeout: 7200s method: signal
> collected 2 items
> tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::test_concurrent_inserts[unique_database0]
> FAILED
> tests/stress/test_acid_stress.py::TestFailingAcidInserts::test_failing_inserts[unique_database0]
> PASSED
> ================================================ short test summary info
> =================================================
> FAIL
> tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]
> ======================================================== FAILURES
> ========================================================
> __________________________
> TestConcurrentAcidInserts.test_concurrent_inserts[unique_database0]
> ___________________________
> tests/stress/test_acid_stress.py:307: in test_concurrent_inserts
> run_tasks(writers + checkers)
> tests/stress/stress_util.py:45: in run_tasks
> pool.map_async(Task.run, tasks).get(timeout_seconds)
> ../Impala.03082022/toolchain/toolchain-packages-gcc7.5.0/python-2.7.16/lib/python2.7/multiprocessing/pool.py:572:
> in get
> raise self._value
> E AssertionError: wid: 2
> E assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
> E At index 3 diff: 2 != 3
> E Left contains more items, first extra item: 4
> E Full diff:
> E - [0, 1, 2, 2, 3, 4]
> E ? ---
> E + [0, 1, 2, 3, 4]
> ------------------------------------------------- Captured stderr setup
> --------------------------------------------------
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21000
> -- connecting to localhost:21050 with impyla
> -- 2022-03-16 09:20:54,762 INFO MainThread: Closing active operation
> -- connecting to localhost:28000 with impyla
> -- 2022-03-16 09:20:54,774 INFO MainThread: Closing active operation
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET sync_ddl=True;
> -- executing against localhost:21000
> DROP DATABASE IF EXISTS `test_concurrent_inserts_8933345c` CASCADE;
> -- 2022-03-16 09:20:54,808 INFO MainThread: Started query
> 28457f4c7e77cdec:c6d3731900000000
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET sync_ddl=True;
> -- executing against localhost:21000
> CREATE DATABASE `test_concurrent_inserts_8933345c`;
> -- 2022-03-16 09:20:54,877 INFO MainThread: Started query
> 374bf99aea680523:48d2405400000000
> -- 2022-03-16 09:21:01,164 INFO MainThread: Created database
> "test_concurrent_inserts_8933345c" for test ID
> "stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]"
> -------------------------------------------------- Captured stderr call
> --------------------------------------------------
> SET SYNC_DDL=true;
> -- executing against localhost:21000
> drop table if exists test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:01,173 INFO MainThread: Started query
> 20480c2a1d336d35:c2d84edd00000000
> -- executing against localhost:21000
> create table test_concurrent_inserts_8933345c.test_concurrent_inserts (wid
> int, i int) TBLPROPERTIES (
> 'transactional_properties' = 'insert_only', 'transactional' = 'true')
> ;
> -- 2022-03-16 09:21:01,294 INFO MainThread: Started query
> 754969473483b4e9:acfc852300000000
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21000
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21001
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- executing against localhost:21000
> -- connecting to: localhost:21000
> -- connecting to: localhost:21002
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21001
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21002
> -- connecting to: localhost:21000
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (0, 0);
> -- executing against localhost:21001
> -- connecting to: localhost:21002
> -- executing against localhost:21002
> -- executing against localhost:21000
> -- connecting to: localhost:21001
> -- executing against localhost:21001
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (1, 0);
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 0);
> -- executing against localhost:21000
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (3, 0);
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (4, 0);
> -- executing against localhost:21001
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (5, 0);
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:07,824 INFO Thread-3: Started query
> 6f4e9d38d1723252:a6a7368f00000000
> -- 2022-03-16 09:21:07,859 INFO Thread-4: Started query
> 33423e0145b7d3e0:ea8d626200000000
> -- 2022-03-16 09:21:07,861 INFO Thread-8: Started query
> 4046d8edde82931b:aa11c84800000000
> -- 2022-03-16 09:21:07,875 INFO Thread-9: Started query
> 594d63b7814c31ab:8f2b92ca00000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 1);
> -- 2022-03-16 09:21:08,229 INFO Thread-4: Started query
> a94dab601c125bb4:8988934300000000
> -- executing against localhost:21000
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 2);
> -- 2022-03-16 09:21:08,384 INFO Thread-3: Started query
> 4040597eba7beb36:4cfa08c800000000
> -- 2022-03-16 09:21:08,409 INFO Thread-4: Started query
> 854e1aee63575861:dbf8bafb00000000
> -- executing against localhost:21002
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- executing against localhost:21001
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:08,435 INFO Thread-9: Started query
> 7e4d4cf54a44cf03:868efcb100000000
> -- 2022-03-16 09:21:08,456 INFO Thread-8: Started query
> 4645788cf2a4d401:0aea969e00000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 3);
> -- 2022-03-16 09:21:08,586 INFO Thread-4: Started query
> 4e4cdd976dfa3358:6c455b7300000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 4);
> -- 2022-03-16 09:21:08,711 INFO Thread-4: Started query
> 6f46942eb8807e5f:ffbb146000000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 5);
> -- executing against localhost:21000
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:08,959 INFO Thread-3: Started query
> d940fa405a776bef:3705218900000000
> -- 2022-03-16 09:21:08,977 INFO Thread-4: Started query
> f947061a7c45901c:12b2ba9d00000000
> -- executing against localhost:21001
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:08,997 INFO Thread-9: Started query
> f54a0aa01bbb2f86:3b89ae9600000000
> -- executing against localhost:21002
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:09,147 INFO Thread-8: Started query
> fa418523bc0552ce:f0e49b1a00000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 6);
> -- 2022-03-16 09:21:09,250 INFO Thread-4: Started query
> fc4751d49d1a1aa2:4bcb48d600000000
> -- closing connection to: localhost:21002
> Traceback (most recent call last):
> File "/home/qchen/Impala.03112022/tests/stress/stress_util.py", line 35, in
> run
> return self.func(*self.args, **self.kwargs)
> File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line
> 276, in _impala_role_concurrent_checker
> verify_result_set(result)
> File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line
> 269, in verify_result_set
> assert sorted_run == range(sorted_run[0], sorted_run[-1] + 1), "wid: %d"
> % wid
> AssertionError: wid: 2
> assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
> At index 3 diff: 2 != 3
> Left contains more items, first extra item: 4
> Full diff:
> - [0, 1, 2, 2, 3, 4]
> ? ---
> + [0, 1, 2, 3, 4]
> ========================================== 1 failed, 1 passed in 158.88
> seconds ==========================================
> [09:23:33 qchen@qifan-10229: Impala.03112022] git branch
> IMPALA-10992-auto-scaling-planner-support
> * master
> [09:23:41 qchen@qifan-10229: Impala.03112022] git diff
> diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
> b/fe/src/main/java/org/apache/impala/service/Frontend.java
> index a921bb961..6df592f3f 100644
> --- a/fe/src/main/java/org/apache/impala/service/Frontend.java
> +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
> @@ -1644,6 +1644,7 @@ public class Frontend {
> markTimelineRetries(attempt, retryMsg, timeline);
> return req;
> } catch (InconsistentMetadataFetchException e) {
> + LOG.error("BAR: catch an InconsistentMetadataFetchException");
> if (attempt++ == INCONSISTENT_METADATA_NUM_RETRIES) {
> markTimelineRetries(attempt, e.getMessage(), timeline);
> throw e;
> @@ -1819,14 +1820,18 @@ public class Frontend {
> } catch (Exception e) {
> if (queryCtx.isSetTransaction_id()) {
> try {
> + LOG.error("BAR: to abort Transaction");
> abortTransaction(queryCtx.getTransaction_id());
> + LOG.error("BAR: Transaction aborted");
> timeline.markEvent("Transaction aborted");
> } catch (TransactionException te) {
> LOG.error("Could not abort transaction because: " +
> te.getMessage());
> }
> } else if (queryCtx.isIs_kudu_transactional()) {
> try {
> + LOG.error("BAR: to abort kudu Transaction");
> abortKuduTransaction(queryCtx.getQuery_id());
> + LOG.error("BAR: kudu Transaction aborted");
> timeline.markEvent(
> "Kudu transaction aborted: " +
> queryCtx.getQuery_id().toString());
> } catch (TransactionException te) {
> diff --git a/tests/stress/test_acid_stress.py
> b/tests/stress/test_acid_stress.py
> index f6439ff3c..09ec874e7 100644
> --- a/tests/stress/test_acid_stress.py
> +++ b/tests/stress/test_acid_stress.py
> @@ -46,188 +46,188 @@ class TestAcidStress(ImpalaTestSuite):
> v.get_value('table_format').compression_codec == 'none'))
>
>
> -class TestAcidInsertsBasic(TestAcidStress):
> - @classmethod
> - def get_workload(self):
> - return super(TestAcidInsertsBasic, self).get_workload()
> -
> - @classmethod
> - def add_test_dimensions(cls):
> - super(TestAcidInsertsBasic, cls).add_test_dimensions()
> -
> - def _verify_result(self, result, expected_result):
> - """Verify invariants for 'run' and 'i'."""
> - assert len(result.data) > 0
> - run_max = -1
> - i_list = []
> - for line in result.data:
> - [run, i] = map(int, (line.split('\t')))
> - run_max = max(run_max, run)
> - i_list.append(i)
> - assert expected_result["run"] <= run_max # shouldn't see data
> overwritten in the past
> - i_list.sort()
> - if expected_result["run"] < run_max:
> - expected_result["run"] = run_max
> - expected_result["i"] = 0
> - return
> - assert i_list[-1] >= expected_result["i"]
> - assert i_list == range(i_list[-1] + 1) # 'i' should have all values
> from 0 to max_i
> - expected_result["i"] = i_list[-1]
> -
> - def _hive_role_write_inserts(self, tbl_name, partitioned):
> - """INSERT INTO/OVERWRITE a table several times from Hive."""
> - part_expr = "partition (p=1)" if partitioned else ""
> - for run in xrange(0, NUM_OVERWRITES):
> - OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> - """ % (tbl_name, part_expr, run, 0)
> - self.run_stmt_in_hive(OVERWRITE_SQL)
> - for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> - INSERT_SQL = """insert into table %s %s values (%i, %i)
> - """ % (tbl_name, part_expr, run, i)
> - self.run_stmt_in_hive(INSERT_SQL)
> -
> - def _impala_role_write_inserts(self, tbl_name, partitioned):
> - """INSERT INTO/OVERWRITE a table several times from Impala."""
> - try:
> - impalad_client = ImpalaTestSuite.create_impala_client()
> - part_expr = "partition (p=1)" if partitioned else ""
> - for run in xrange(0, NUM_OVERWRITES + 1):
> - OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> - """ % (tbl_name, part_expr, run, 0)
> - impalad_client.execute(OVERWRITE_SQL)
> - for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> - INSERT_SQL = """insert into table %s %s values (%i, %i)
> - """ % (tbl_name, part_expr, run, i)
> - impalad_client.execute(INSERT_SQL)
> - finally:
> - impalad_client.close()
> -
> - def _impala_role_read_inserts(self, tbl_name, needs_refresh,
> sleep_seconds):
> - """SELECT from a table many times until the expected final values are
> found."""
> - try:
> - impalad_client = ImpalaTestSuite.create_impala_client()
> - expected_result = {"run": -1, "i": 0}
> - accept_empty_table = True
> - while expected_result["run"] != NUM_OVERWRITES and \
> - expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
> - time.sleep(sleep_seconds)
> - if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
> - result = impalad_client.execute("select run, i from %s" % tbl_name)
> - if len(result.data) == 0:
> - assert accept_empty_table
> - continue
> - accept_empty_table = False
> - self._verify_result(result, expected_result)
> - finally:
> - impalad_client.close()
> -
> - def _create_table(self, full_tbl_name, partitioned):
> - """Creates test table with name 'full_tbl_name'. Table is partitioned if
> - 'partitioned' is set to True."""
> - part_expr = "partitioned by (p int)" if partitioned else ""
> -
> - CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
> - 'transactional_properties' = 'insert_only', 'transactional' =
> 'true')
> - """ % (full_tbl_name, part_expr)
> - self.client.execute("drop table if exists %s" % full_tbl_name)
> - self.client.execute(CREATE_SQL)
> -
> - def _run_test_read_hive_inserts(self, unique_database, partitioned):
> - """Check that Impala can read a single insert only ACID table
> (over)written by Hive
> - several times. Consistency can be checked by using incremental values for
> - overwrites ('run') and inserts ('i').
> - """
> - tbl_name = "%s.test_read_hive_inserts" % unique_database
> - self._create_table(tbl_name, partitioned)
> -
> - run_tasks([
> - Task(self._hive_role_write_inserts, tbl_name, partitioned),
> - Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
> - sleep_seconds=3)])
> -
> - def _run_test_read_impala_inserts(self, unique_database, partitioned):
> - """Check that Impala can read a single insert only ACID table
> (over)written by Hive
> - several times. Consistency can be checked by using incremental values for
> - overwrites ('run') and inserts ('i').
> - """
> - tbl_name = "%s.test_read_impala_inserts" % unique_database
> - self._create_table(tbl_name, partitioned)
> -
> - run_tasks([
> - Task(self._impala_role_write_inserts, tbl_name, partitioned),
> - Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
> - sleep_seconds=0.1)])
> -
> - @SkipIfHive2.acid
> - @SkipIfS3.hive
> - @SkipIfGCS.hive
> - @SkipIfCOS.hive
> - @pytest.mark.execute_serially
> - @pytest.mark.stress
> - def test_read_hive_inserts(self, unique_database):
> - """Check that Impala can read partitioned and non-partitioned ACID tables
> - written by Hive."""
> - for is_partitioned in [False, True]:
> - self._run_test_read_hive_inserts(unique_database, is_partitioned)
> -
> - @SkipIfHive2.acid
> - @pytest.mark.execute_serially
> - @pytest.mark.stress
> - def test_read_impala_inserts(self, unique_database):
> - """Check that Impala can read partitioned and non-partitioned ACID tables
> - written by Hive."""
> - for is_partitioned in [False, True]:
> - self._run_test_read_impala_inserts(unique_database, is_partitioned)
> -
> - def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite,
> sleep_sec):
> - insert_op = "OVERWRITE" if is_overwrite else "INTO"
> - try:
> - impalad_client = ImpalaTestSuite.create_impala_client()
> - impalad_client.execute(
> - """insert {op} table {tbl_name} partition({partition})
> - select sleep({sleep_ms})""".format(op=insert_op,
> tbl_name=tbl_name,
> - partition=partition, sleep_ms=sleep_sec * 1000))
> - finally:
> - impalad_client.close()
> -
> - @pytest.mark.execute_serially
> - @pytest.mark.stress
> - @SkipIf.not_hdfs
> - @UniqueDatabase.parametrize(sync_ddl=True)
> - def test_partitioned_inserts(self, unique_database):
> - """Check that the different ACID write operations take appropriate locks.
> - INSERT INTO: should take a shared lock
> - INSERT OVERWRITE: should take an exclusive lock
> - Both should take PARTITION-level lock in case of static partition
> insert."""
> - tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
> - self.client.set_configuration_option("SYNC_DDL", "true")
> - self.client.execute("""
> - CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
> - TBLPROPERTIES(
> -
> 'transactional_properties'='insert_only','transactional'='true')""".format(
> - tbl_name))
> - # Warmup INSERT
> - self.execute_query("alter table {0} add
> partition(p=0,q=0)".format(tbl_name))
> - sleep_sec = 5
> - task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
> - "p=1,q=1", False, sleep_sec)
> - # INSERT INTO the same partition can run in parallel.
> - duration = run_tasks([task_insert_into, task_insert_into])
> - assert duration < 3 * sleep_sec
> - task_insert_overwrite = Task(self._impala_role_partition_writer,
> tbl_name,
> - "p=1,q=1", True, sleep_sec)
> - # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
> - duration = run_tasks([task_insert_into, task_insert_overwrite])
> - assert duration > 4 * sleep_sec
> - # INSERT OVERWRITEs to the same partition should have mutual exclusion.
> - duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
> - assert duration > 4 * sleep_sec
> - task_insert_overwrite_2 = Task(self._impala_role_partition_writer,
> tbl_name,
> - "p=1,q=2", True, sleep_sec)
> - # INSERT OVERWRITEs to different partitions can run in parallel.
> - duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
> - assert duration < 3 * sleep_sec
> -
> +#class TestAcidInsertsBasic(TestAcidStress):
> +# @classmethod
> +# def get_workload(self):
> +# return super(TestAcidInsertsBasic, self).get_workload()
> +#
> +# @classmethod
> +# def add_test_dimensions(cls):
> +# super(TestAcidInsertsBasic, cls).add_test_dimensions()
> +#
> +# def _verify_result(self, result, expected_result):
> +# """Verify invariants for 'run' and 'i'."""
> +# assert len(result.data) > 0
> +# run_max = -1
> +# i_list = []
> +# for line in result.data:
> +# [run, i] = map(int, (line.split('\t')))
> +# run_max = max(run_max, run)
> +# i_list.append(i)
> +# assert expected_result["run"] <= run_max # shouldn't see data
> overwritten in the past
> +# i_list.sort()
> +# if expected_result["run"] < run_max:
> +# expected_result["run"] = run_max
> +# expected_result["i"] = 0
> +# return
> +# assert i_list[-1] >= expected_result["i"]
> +# assert i_list == range(i_list[-1] + 1) # 'i' should have all values
> from 0 to max_i
> +# expected_result["i"] = i_list[-1]
> +#
> +# def _hive_role_write_inserts(self, tbl_name, partitioned):
> +# """INSERT INTO/OVERWRITE a table several times from Hive."""
> +# part_expr = "partition (p=1)" if partitioned else ""
> +# for run in xrange(0, NUM_OVERWRITES):
> +# OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> +# """ % (tbl_name, part_expr, run, 0)
> +# self.run_stmt_in_hive(OVERWRITE_SQL)
> +# for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> +# INSERT_SQL = """insert into table %s %s values (%i, %i)
> +# """ % (tbl_name, part_expr, run, i)
> +# self.run_stmt_in_hive(INSERT_SQL)
> +#
> +# def _impala_role_write_inserts(self, tbl_name, partitioned):
> +# """INSERT INTO/OVERWRITE a table several times from Impala."""
> +# try:
> +# impalad_client = ImpalaTestSuite.create_impala_client()
> +# part_expr = "partition (p=1)" if partitioned else ""
> +# for run in xrange(0, NUM_OVERWRITES + 1):
> +# OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> +# """ % (tbl_name, part_expr, run, 0)
> +# impalad_client.execute(OVERWRITE_SQL)
> +# for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> +# INSERT_SQL = """insert into table %s %s values (%i, %i)
> +# """ % (tbl_name, part_expr, run, i)
> +# impalad_client.execute(INSERT_SQL)
> +# finally:
> +# impalad_client.close()
> +#
> +# def _impala_role_read_inserts(self, tbl_name, needs_refresh,
> sleep_seconds):
> +# """SELECT from a table many times until the expected final values are
> found."""
> +# try:
> +# impalad_client = ImpalaTestSuite.create_impala_client()
> +# expected_result = {"run": -1, "i": 0}
> +# accept_empty_table = True
> +# while expected_result["run"] != NUM_OVERWRITES and \
> +# expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
> +# time.sleep(sleep_seconds)
> +# if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
> +# result = impalad_client.execute("select run, i from %s" % tbl_name)
> +# if len(result.data) == 0:
> +# assert accept_empty_table
> +# continue
> +# accept_empty_table = False
> +# self._verify_result(result, expected_result)
> +# finally:
> +# impalad_client.close()
> +#
> +# def _create_table(self, full_tbl_name, partitioned):
> +# """Creates test table with name 'full_tbl_name'. Table is partitioned if
> +# 'partitioned' is set to True."""
> +# part_expr = "partitioned by (p int)" if partitioned else ""
> +#
> +# CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
> +# 'transactional_properties' = 'insert_only', 'transactional' =
> 'true')
> +# """ % (full_tbl_name, part_expr)
> +# self.client.execute("drop table if exists %s" % full_tbl_name)
> +# self.client.execute(CREATE_SQL)
> +#
> +# def _run_test_read_hive_inserts(self, unique_database, partitioned):
> +# """Check that Impala can read a single insert only ACID table
> (over)written by Hive
> +# several times. Consistency can be checked by using incremental values
> for
> +# overwrites ('run') and inserts ('i').
> +# """
> +# tbl_name = "%s.test_read_hive_inserts" % unique_database
> +# self._create_table(tbl_name, partitioned)
> +#
> +# run_tasks([
> +# Task(self._hive_role_write_inserts, tbl_name, partitioned),
> +# Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
> +# sleep_seconds=3)])
> +#
> +# def _run_test_read_impala_inserts(self, unique_database, partitioned):
> +# """Check that Impala can read a single insert only ACID table
> (over)written by Hive
> +# several times. Consistency can be checked by using incremental values
> for
> +# overwrites ('run') and inserts ('i').
> +# """
> +# tbl_name = "%s.test_read_impala_inserts" % unique_database
> +# self._create_table(tbl_name, partitioned)
> +#
> +# run_tasks([
> +# Task(self._impala_role_write_inserts, tbl_name, partitioned),
> +# Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
> +# sleep_seconds=0.1)])
> +#
> +# @SkipIfHive2.acid
> +# @SkipIfS3.hive
> +# @SkipIfGCS.hive
> +# @SkipIfCOS.hive
> +# @pytest.mark.execute_serially
> +# @pytest.mark.stress
> +# def test_read_hive_inserts(self, unique_database):
> +# """Check that Impala can read partitioned and non-partitioned ACID
> tables
> +# written by Hive."""
> +# for is_partitioned in [False, True]:
> +# self._run_test_read_hive_inserts(unique_database, is_partitioned)
> +#
> +# @SkipIfHive2.acid
> +# @pytest.mark.execute_serially
> +# @pytest.mark.stress
> +# def test_read_impala_inserts(self, unique_database):
> +# """Check that Impala can read partitioned and non-partitioned ACID
> tables
> +# written by Hive."""
> +# for is_partitioned in [False, True]:
> +# self._run_test_read_impala_inserts(unique_database, is_partitioned)
> +#
> +# def _impala_role_partition_writer(self, tbl_name, partition,
> is_overwrite, sleep_sec):
> +# insert_op = "OVERWRITE" if is_overwrite else "INTO"
> +# try:
> +# impalad_client = ImpalaTestSuite.create_impala_client()
> +# impalad_client.execute(
> +# """insert {op} table {tbl_name} partition({partition})
> +# select sleep({sleep_ms})""".format(op=insert_op,
> tbl_name=tbl_name,
> +# partition=partition, sleep_ms=sleep_sec * 1000))
> +# finally:
> +# impalad_client.close()
> +#
> +# @pytest.mark.execute_serially
> +# @pytest.mark.stress
> +# @SkipIf.not_hdfs
> +# @UniqueDatabase.parametrize(sync_ddl=True)
> +# def test_partitioned_inserts(self, unique_database):
> +# """Check that the different ACID write operations take appropriate
> locks.
> +# INSERT INTO: should take a shared lock
> +# INSERT OVERWRITE: should take an exclusive lock
> +# Both should take PARTITION-level lock in case of static partition
> insert."""
> +# tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
> +# self.client.set_configuration_option("SYNC_DDL", "true")
> +# self.client.execute("""
> +# CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
> +# TBLPROPERTIES(
> +#
> 'transactional_properties'='insert_only','transactional'='true')""".format(
> +# tbl_name))
> +# # Warmup INSERT
> +# self.execute_query("alter table {0} add
> partition(p=0,q=0)".format(tbl_name))
> +# sleep_sec = 5
> +# task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
> +# "p=1,q=1", False, sleep_sec)
> +# # INSERT INTO the same partition can run in parallel.
> +# duration = run_tasks([task_insert_into, task_insert_into])
> +# assert duration < 3 * sleep_sec
> +# task_insert_overwrite = Task(self._impala_role_partition_writer,
> tbl_name,
> +# "p=1,q=1", True, sleep_sec)
> +# # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
> +# duration = run_tasks([task_insert_into, task_insert_overwrite])
> +# assert duration > 4 * sleep_sec
> +# # INSERT OVERWRITEs to the same partition should have mutual exclusion.
> +# duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
> +# assert duration > 4 * sleep_sec
> +# task_insert_overwrite_2 = Task(self._impala_role_partition_writer,
> tbl_name,
> +# "p=1,q=2", True, sleep_sec)
> +# # INSERT OVERWRITEs to different partitions can run in parallel.
> +# duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
> +# assert duration < 3 * sleep_sec
> +#
>
> class TestConcurrentAcidInserts(TestAcidStress):
> @classmethod
> (END)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]