[
https://issues.apache.org/jira/browse/IMPALA-11189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510946#comment-17510946
]
ASF subversion and git services commented on IMPALA-11189:
----------------------------------------------------------
Commit 07a3e6e0df8ed05577a52d896d0ccfbbf6dd8e28 in impala's branch
refs/heads/master from Qifan Chen
[ https://gitbox.apache.org/repos/asf?p=impala.git;h=07a3e6e ]
IMPALA-10992 Planner changes for estimate peak memory
This patch provides replan support for multiple executor group sets.
Each executor group set is associated with a distinct number of nodes
and a threshold for estimated memory per host in bytes that can be
denoted as [<group_name_prefix>:<#nodes>, <threshold>].
In the patch, a query of type EXPLAIN, QUERY or DML can be compiled
more than once. In each attempt, per host memory is estimated and
compared with the threshold of an executor group set. If the estimated
memory is no more than the threshold, the iteration process terminates
and the final plan is determined. The executor group set with the
threshold is selected to run the query.
A new query option 'enable_replan', default to 1 (enabled), is added.
It can be set to 0 to disable this patch and to generate the distributed
plan for the default executor group.
To avoid long compilation time, the following enhancement is enabled.
Note 1) can be disabled when relevant meta-data change is
detected.
1. Authorization is performed only for the 1st compilation;
2. openTransaction() is called for transactional queries in 1st
compilation and the saved transactional info is used in
subsequent compilations. Similar logic is applied to Kudu
transactional queries.
To facilitate testing, the patch imposes an artificial two executor
group setup in FE as follows.
1. [regular:<#nodes>, 64MB]
2. [large:<#nodes>, 8PB]
This setup is enabled when a new query option 'test_replan' is set
to 1 in backend tests, or RuntimeEnv.INSTANCE.isTestEnv() is true as
in most frontend tests. This query option is set to 0 by default.
Compilation time increases when a query is compiled in several
iterations, as shown below for several TPCDs queries. The increase
is mostly due to redundant work in either single node plan creation
or recomputing value transfer graph phase. For small queries, the
increase can be avoided if they can be compiled in single iteration
by properly setting the smallest threshold among all executor group
sets. For example, for the set of queries listed below, the smallest
threshold can be set to 320MB to catch both q15 and q21 in one
compilation.
Compilation time (ms)
Queries Estimated Memory 2-iterations 1-iteration Percentage of
increase
q1 408MB 60.14 25.75 133.56%
q11 1.37GB 261.00 109.61 138.11%
q10a 519MB 139.24 54.52 155.39%
q13 339MB 143.82 60.08 139.38%
q14a 3.56GB 762.68 312.92 143.73%
q14b 2.20GB 522.01 245.13 112.95%
q15 314MB 9.73 4.28 127.33%
q21 275MB 16.00 8.18 95.59%
q23a 1.50GB 461.69 231.78 99.19%
q23b 1.34GB 461.31 219.61 110.05%
q4 2.60GB 218.05 105.07 107.52%
q67 5.16GB 694.59 334.24 101.82%
Testing:
1. Almost all FE and BE tests are now run in the artificial two
executor setup except a few where a specific cluster configuration
is desirable;
2. Ran core tests successfully;
3. Added a new observability test and a new query assignment test;
4. Disabled concurrent insert test (test_concurrent_inserts) and
failing inserts (test_failing_inserts) test in local catalog mode
due to flakiness. Reported both in IMPALA-11189 and IMPALA-11191.
Change-Id: I75cf17290be2c64fd4b732a5505bdac31869712a
Reviewed-on: http://gerrit.cloudera.org:8080/18178
Reviewed-by: Qifan Chen <[email protected]>
Tested-by: Qifan Chen <[email protected]>
> Concurrent insert ACID tests are broken in local catalog mode
> -------------------------------------------------------------
>
> Key: IMPALA-11189
> URL: https://issues.apache.org/jira/browse/IMPALA-11189
> Project: IMPALA
> Issue Type: Bug
> Components: Catalog
> Reporter: Qifan Chen
> Priority: Major
>
> Stress test test_concurrent_inserts (in tests/stress/test_acid_stress.py) can
> fail repeatedly in local catalog mode. In this case, the concurrent checker
> query (select * from <table>) returns duplicated rows such as reported below,
> where row [0,2] is duplicated.
> The failure can be reproduced quite easily by running the test (i.e.,
> TestConcurrentAcidInserts) first, via commenting out all the tests prior to
> it in the test file tests/stress/test_acid_stress.py.
> Setup:
> 1. Build the impala and clear HMS in case in a bad state:
> $IMPALA_HOME/buildall.sh -format_metastore -notests
> 2. Start the cluster in local catalog mode:
> $IMPALA_HOME/bin/start-impala-cluster.py --impalad_args
> --use_local_catalog=true --catalogd_args --catalog_topic_mode=minimal
> --catalogd_args --hms_event_polling_interval_s=1
> 3. Run the modified stress test: $IMPALA_HOME/bin/impala-py.test
> $IMPALA_TESTS/stress/test_acid_stress.py
> Error reported:
> {code:java}
> 09:11:00 qchen@qifan-10229: Impala.03112022] test_acid_stress
> rootLoggerLevel = INFO
> ================================================== test session starts
> ===================================================
> platform linux2 -- Python 2.7.16, pytest-2.9.2, py-1.4.32, pluggy-0.3.1 --
> /home/qchen/Impala.03112022/infra/python/env-gcc7.5.0/bin/python
> cachedir: tests/.cache
> rootdir: /home/qchen/Impala.03112022/tests, inifile: pytest.ini
> plugins: xdist-1.17.1, timeout-1.2.1, random-0.2, forked-0.2
> timeout: 7200s method: signal
> collected 2 items
> tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::test_concurrent_inserts[unique_database0]
> FAILED
> tests/stress/test_acid_stress.py::TestFailingAcidInserts::test_failing_inserts[unique_database0]
> PASSED
> ================================================ short test summary info
> =================================================
> FAIL
> tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]
> ======================================================== FAILURES
> ========================================================
> __________________________
> TestConcurrentAcidInserts.test_concurrent_inserts[unique_database0]
> ___________________________
> tests/stress/test_acid_stress.py:307: in test_concurrent_inserts
> run_tasks(writers + checkers)
> tests/stress/stress_util.py:45: in run_tasks
> pool.map_async(Task.run, tasks).get(timeout_seconds)
> ../Impala.03082022/toolchain/toolchain-packages-gcc7.5.0/python-2.7.16/lib/python2.7/multiprocessing/pool.py:572:
> in get
> raise self._value
> E AssertionError: wid: 2
> E assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
> E At index 3 diff: 2 != 3
> E Left contains more items, first extra item: 4
> E Full diff:
> E - [0, 1, 2, 2, 3, 4]
> E ? ---
> E + [0, 1, 2, 3, 4]
> ------------------------------------------------- Captured stderr setup
> --------------------------------------------------
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21000
> -- connecting to localhost:21050 with impyla
> -- 2022-03-16 09:20:54,762 INFO MainThread: Closing active operation
> -- connecting to localhost:28000 with impyla
> -- 2022-03-16 09:20:54,774 INFO MainThread: Closing active operation
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET sync_ddl=True;
> -- executing against localhost:21000
> DROP DATABASE IF EXISTS `test_concurrent_inserts_8933345c` CASCADE;
> -- 2022-03-16 09:20:54,808 INFO MainThread: Started query
> 28457f4c7e77cdec:c6d3731900000000
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET sync_ddl=True;
> -- executing against localhost:21000
> CREATE DATABASE `test_concurrent_inserts_8933345c`;
> -- 2022-03-16 09:20:54,877 INFO MainThread: Started query
> 374bf99aea680523:48d2405400000000
> -- 2022-03-16 09:21:01,164 INFO MainThread: Created database
> "test_concurrent_inserts_8933345c" for test ID
> "stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]"
> -------------------------------------------------- Captured stderr call
> --------------------------------------------------
> SET SYNC_DDL=true;
> -- executing against localhost:21000
> drop table if exists test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:01,173 INFO MainThread: Started query
> 20480c2a1d336d35:c2d84edd00000000
> -- executing against localhost:21000
> create table test_concurrent_inserts_8933345c.test_concurrent_inserts (wid
> int, i int) TBLPROPERTIES (
> 'transactional_properties' = 'insert_only', 'transactional' = 'true')
> ;
> -- 2022-03-16 09:21:01,294 INFO MainThread: Started query
> 754969473483b4e9:acfc852300000000
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21000
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21001
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- executing against localhost:21000
> -- connecting to: localhost:21000
> -- connecting to: localhost:21002
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21001
> SET
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21002
> -- connecting to: localhost:21000
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (0, 0);
> -- executing against localhost:21001
> -- connecting to: localhost:21002
> -- executing against localhost:21002
> -- executing against localhost:21000
> -- connecting to: localhost:21001
> -- executing against localhost:21001
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (1, 0);
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 0);
> -- executing against localhost:21000
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (3, 0);
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (4, 0);
> -- executing against localhost:21001
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (5, 0);
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:07,824 INFO Thread-3: Started query
> 6f4e9d38d1723252:a6a7368f00000000
> -- 2022-03-16 09:21:07,859 INFO Thread-4: Started query
> 33423e0145b7d3e0:ea8d626200000000
> -- 2022-03-16 09:21:07,861 INFO Thread-8: Started query
> 4046d8edde82931b:aa11c84800000000
> -- 2022-03-16 09:21:07,875 INFO Thread-9: Started query
> 594d63b7814c31ab:8f2b92ca00000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 1);
> -- 2022-03-16 09:21:08,229 INFO Thread-4: Started query
> a94dab601c125bb4:8988934300000000
> -- executing against localhost:21000
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 2);
> -- 2022-03-16 09:21:08,384 INFO Thread-3: Started query
> 4040597eba7beb36:4cfa08c800000000
> -- 2022-03-16 09:21:08,409 INFO Thread-4: Started query
> 854e1aee63575861:dbf8bafb00000000
> -- executing against localhost:21002
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- executing against localhost:21001
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:08,435 INFO Thread-9: Started query
> 7e4d4cf54a44cf03:868efcb100000000
> -- 2022-03-16 09:21:08,456 INFO Thread-8: Started query
> 4645788cf2a4d401:0aea969e00000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 3);
> -- 2022-03-16 09:21:08,586 INFO Thread-4: Started query
> 4e4cdd976dfa3358:6c455b7300000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 4);
> -- 2022-03-16 09:21:08,711 INFO Thread-4: Started query
> 6f46942eb8807e5f:ffbb146000000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 5);
> -- executing against localhost:21000
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:08,959 INFO Thread-3: Started query
> d940fa405a776bef:3705218900000000
> -- 2022-03-16 09:21:08,977 INFO Thread-4: Started query
> f947061a7c45901c:12b2ba9d00000000
> -- executing against localhost:21001
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:08,997 INFO Thread-9: Started query
> f54a0aa01bbb2f86:3b89ae9600000000
> -- executing against localhost:21002
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:09,147 INFO Thread-8: Started query
> fa418523bc0552ce:f0e49b1a00000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts
> values (2, 6);
> -- 2022-03-16 09:21:09,250 INFO Thread-4: Started query
> fc4751d49d1a1aa2:4bcb48d600000000
> -- closing connection to: localhost:21002
> Traceback (most recent call last):
> File "/home/qchen/Impala.03112022/tests/stress/stress_util.py", line 35, in
> run
> return self.func(*self.args, **self.kwargs)
> File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line
> 276, in _impala_role_concurrent_checker
> verify_result_set(result)
> File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line
> 269, in verify_result_set
> assert sorted_run == range(sorted_run[0], sorted_run[-1] + 1), "wid: %d"
> % wid
> AssertionError: wid: 2
> assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
> At index 3 diff: 2 != 3
> Left contains more items, first extra item: 4
> Full diff:
> - [0, 1, 2, 2, 3, 4]
> ? ---
> + [0, 1, 2, 3, 4]
> ========================================== 1 failed, 1 passed in 158.88
> seconds ==========================================
> [09:23:33 qchen@qifan-10229: Impala.03112022] git branch
> IMPALA-10992-auto-scaling-planner-support
> * master
> [09:23:41 qchen@qifan-10229: Impala.03112022] git diff
> diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
> b/fe/src/main/java/org/apache/impala/service/Frontend.java
> index a921bb961..6df592f3f 100644
> --- a/fe/src/main/java/org/apache/impala/service/Frontend.java
> +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
> @@ -1644,6 +1644,7 @@ public class Frontend {
> markTimelineRetries(attempt, retryMsg, timeline);
> return req;
> } catch (InconsistentMetadataFetchException e) {
> + LOG.error("BAR: catch an InconsistentMetadataFetchException");
> if (attempt++ == INCONSISTENT_METADATA_NUM_RETRIES) {
> markTimelineRetries(attempt, e.getMessage(), timeline);
> throw e;
> @@ -1819,14 +1820,18 @@ public class Frontend {
> } catch (Exception e) {
> if (queryCtx.isSetTransaction_id()) {
> try {
> + LOG.error("BAR: to abort Transaction");
> abortTransaction(queryCtx.getTransaction_id());
> + LOG.error("BAR: Transaction aborted");
> timeline.markEvent("Transaction aborted");
> } catch (TransactionException te) {
> LOG.error("Could not abort transaction because: " +
> te.getMessage());
> }
> } else if (queryCtx.isIs_kudu_transactional()) {
> try {
> + LOG.error("BAR: to abort kudu Transaction");
> abortKuduTransaction(queryCtx.getQuery_id());
> + LOG.error("BAR: kudu Transaction aborted");
> timeline.markEvent(
> "Kudu transaction aborted: " +
> queryCtx.getQuery_id().toString());
> } catch (TransactionException te) {
> diff --git a/tests/stress/test_acid_stress.py
> b/tests/stress/test_acid_stress.py
> index f6439ff3c..09ec874e7 100644
> --- a/tests/stress/test_acid_stress.py
> +++ b/tests/stress/test_acid_stress.py
> @@ -46,188 +46,188 @@ class TestAcidStress(ImpalaTestSuite):
> v.get_value('table_format').compression_codec == 'none'))
>
>
> -class TestAcidInsertsBasic(TestAcidStress):
> - @classmethod
> - def get_workload(self):
> - return super(TestAcidInsertsBasic, self).get_workload()
> -
> - @classmethod
> - def add_test_dimensions(cls):
> - super(TestAcidInsertsBasic, cls).add_test_dimensions()
> -
> - def _verify_result(self, result, expected_result):
> - """Verify invariants for 'run' and 'i'."""
> - assert len(result.data) > 0
> - run_max = -1
> - i_list = []
> - for line in result.data:
> - [run, i] = map(int, (line.split('\t')))
> - run_max = max(run_max, run)
> - i_list.append(i)
> - assert expected_result["run"] <= run_max # shouldn't see data
> overwritten in the past
> - i_list.sort()
> - if expected_result["run"] < run_max:
> - expected_result["run"] = run_max
> - expected_result["i"] = 0
> - return
> - assert i_list[-1] >= expected_result["i"]
> - assert i_list == range(i_list[-1] + 1) # 'i' should have all values
> from 0 to max_i
> - expected_result["i"] = i_list[-1]
> -
> - def _hive_role_write_inserts(self, tbl_name, partitioned):
> - """INSERT INTO/OVERWRITE a table several times from Hive."""
> - part_expr = "partition (p=1)" if partitioned else ""
> - for run in xrange(0, NUM_OVERWRITES):
> - OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> - """ % (tbl_name, part_expr, run, 0)
> - self.run_stmt_in_hive(OVERWRITE_SQL)
> - for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> - INSERT_SQL = """insert into table %s %s values (%i, %i)
> - """ % (tbl_name, part_expr, run, i)
> - self.run_stmt_in_hive(INSERT_SQL)
> -
> - def _impala_role_write_inserts(self, tbl_name, partitioned):
> - """INSERT INTO/OVERWRITE a table several times from Impala."""
> - try:
> - impalad_client = ImpalaTestSuite.create_impala_client()
> - part_expr = "partition (p=1)" if partitioned else ""
> - for run in xrange(0, NUM_OVERWRITES + 1):
> - OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> - """ % (tbl_name, part_expr, run, 0)
> - impalad_client.execute(OVERWRITE_SQL)
> - for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> - INSERT_SQL = """insert into table %s %s values (%i, %i)
> - """ % (tbl_name, part_expr, run, i)
> - impalad_client.execute(INSERT_SQL)
> - finally:
> - impalad_client.close()
> -
> - def _impala_role_read_inserts(self, tbl_name, needs_refresh,
> sleep_seconds):
> - """SELECT from a table many times until the expected final values are
> found."""
> - try:
> - impalad_client = ImpalaTestSuite.create_impala_client()
> - expected_result = {"run": -1, "i": 0}
> - accept_empty_table = True
> - while expected_result["run"] != NUM_OVERWRITES and \
> - expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
> - time.sleep(sleep_seconds)
> - if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
> - result = impalad_client.execute("select run, i from %s" % tbl_name)
> - if len(result.data) == 0:
> - assert accept_empty_table
> - continue
> - accept_empty_table = False
> - self._verify_result(result, expected_result)
> - finally:
> - impalad_client.close()
> -
> - def _create_table(self, full_tbl_name, partitioned):
> - """Creates test table with name 'full_tbl_name'. Table is partitioned if
> - 'partitioned' is set to True."""
> - part_expr = "partitioned by (p int)" if partitioned else ""
> -
> - CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
> - 'transactional_properties' = 'insert_only', 'transactional' =
> 'true')
> - """ % (full_tbl_name, part_expr)
> - self.client.execute("drop table if exists %s" % full_tbl_name)
> - self.client.execute(CREATE_SQL)
> -
> - def _run_test_read_hive_inserts(self, unique_database, partitioned):
> - """Check that Impala can read a single insert only ACID table
> (over)written by Hive
> - several times. Consistency can be checked by using incremental values for
> - overwrites ('run') and inserts ('i').
> - """
> - tbl_name = "%s.test_read_hive_inserts" % unique_database
> - self._create_table(tbl_name, partitioned)
> -
> - run_tasks([
> - Task(self._hive_role_write_inserts, tbl_name, partitioned),
> - Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
> - sleep_seconds=3)])
> -
> - def _run_test_read_impala_inserts(self, unique_database, partitioned):
> - """Check that Impala can read a single insert only ACID table
> (over)written by Hive
> - several times. Consistency can be checked by using incremental values for
> - overwrites ('run') and inserts ('i').
> - """
> - tbl_name = "%s.test_read_impala_inserts" % unique_database
> - self._create_table(tbl_name, partitioned)
> -
> - run_tasks([
> - Task(self._impala_role_write_inserts, tbl_name, partitioned),
> - Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
> - sleep_seconds=0.1)])
> -
> - @SkipIfHive2.acid
> - @SkipIfS3.hive
> - @SkipIfGCS.hive
> - @SkipIfCOS.hive
> - @pytest.mark.execute_serially
> - @pytest.mark.stress
> - def test_read_hive_inserts(self, unique_database):
> - """Check that Impala can read partitioned and non-partitioned ACID tables
> - written by Hive."""
> - for is_partitioned in [False, True]:
> - self._run_test_read_hive_inserts(unique_database, is_partitioned)
> -
> - @SkipIfHive2.acid
> - @pytest.mark.execute_serially
> - @pytest.mark.stress
> - def test_read_impala_inserts(self, unique_database):
> - """Check that Impala can read partitioned and non-partitioned ACID tables
> - written by Hive."""
> - for is_partitioned in [False, True]:
> - self._run_test_read_impala_inserts(unique_database, is_partitioned)
> -
> - def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite,
> sleep_sec):
> - insert_op = "OVERWRITE" if is_overwrite else "INTO"
> - try:
> - impalad_client = ImpalaTestSuite.create_impala_client()
> - impalad_client.execute(
> - """insert {op} table {tbl_name} partition({partition})
> - select sleep({sleep_ms})""".format(op=insert_op,
> tbl_name=tbl_name,
> - partition=partition, sleep_ms=sleep_sec * 1000))
> - finally:
> - impalad_client.close()
> -
> - @pytest.mark.execute_serially
> - @pytest.mark.stress
> - @SkipIf.not_hdfs
> - @UniqueDatabase.parametrize(sync_ddl=True)
> - def test_partitioned_inserts(self, unique_database):
> - """Check that the different ACID write operations take appropriate locks.
> - INSERT INTO: should take a shared lock
> - INSERT OVERWRITE: should take an exclusive lock
> - Both should take PARTITION-level lock in case of static partition
> insert."""
> - tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
> - self.client.set_configuration_option("SYNC_DDL", "true")
> - self.client.execute("""
> - CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
> - TBLPROPERTIES(
> -
> 'transactional_properties'='insert_only','transactional'='true')""".format(
> - tbl_name))
> - # Warmup INSERT
> - self.execute_query("alter table {0} add
> partition(p=0,q=0)".format(tbl_name))
> - sleep_sec = 5
> - task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
> - "p=1,q=1", False, sleep_sec)
> - # INSERT INTO the same partition can run in parallel.
> - duration = run_tasks([task_insert_into, task_insert_into])
> - assert duration < 3 * sleep_sec
> - task_insert_overwrite = Task(self._impala_role_partition_writer,
> tbl_name,
> - "p=1,q=1", True, sleep_sec)
> - # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
> - duration = run_tasks([task_insert_into, task_insert_overwrite])
> - assert duration > 4 * sleep_sec
> - # INSERT OVERWRITEs to the same partition should have mutual exclusion.
> - duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
> - assert duration > 4 * sleep_sec
> - task_insert_overwrite_2 = Task(self._impala_role_partition_writer,
> tbl_name,
> - "p=1,q=2", True, sleep_sec)
> - # INSERT OVERWRITEs to different partitions can run in parallel.
> - duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
> - assert duration < 3 * sleep_sec
> -
> +#class TestAcidInsertsBasic(TestAcidStress):
> +# @classmethod
> +# def get_workload(self):
> +# return super(TestAcidInsertsBasic, self).get_workload()
> +#
> +# @classmethod
> +# def add_test_dimensions(cls):
> +# super(TestAcidInsertsBasic, cls).add_test_dimensions()
> +#
> +# def _verify_result(self, result, expected_result):
> +# """Verify invariants for 'run' and 'i'."""
> +# assert len(result.data) > 0
> +# run_max = -1
> +# i_list = []
> +# for line in result.data:
> +# [run, i] = map(int, (line.split('\t')))
> +# run_max = max(run_max, run)
> +# i_list.append(i)
> +# assert expected_result["run"] <= run_max # shouldn't see data
> overwritten in the past
> +# i_list.sort()
> +# if expected_result["run"] < run_max:
> +# expected_result["run"] = run_max
> +# expected_result["i"] = 0
> +# return
> +# assert i_list[-1] >= expected_result["i"]
> +# assert i_list == range(i_list[-1] + 1) # 'i' should have all values
> from 0 to max_i
> +# expected_result["i"] = i_list[-1]
> +#
> +# def _hive_role_write_inserts(self, tbl_name, partitioned):
> +# """INSERT INTO/OVERWRITE a table several times from Hive."""
> +# part_expr = "partition (p=1)" if partitioned else ""
> +# for run in xrange(0, NUM_OVERWRITES):
> +# OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> +# """ % (tbl_name, part_expr, run, 0)
> +# self.run_stmt_in_hive(OVERWRITE_SQL)
> +# for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> +# INSERT_SQL = """insert into table %s %s values (%i, %i)
> +# """ % (tbl_name, part_expr, run, i)
> +# self.run_stmt_in_hive(INSERT_SQL)
> +#
> +# def _impala_role_write_inserts(self, tbl_name, partitioned):
> +# """INSERT INTO/OVERWRITE a table several times from Impala."""
> +# try:
> +# impalad_client = ImpalaTestSuite.create_impala_client()
> +# part_expr = "partition (p=1)" if partitioned else ""
> +# for run in xrange(0, NUM_OVERWRITES + 1):
> +# OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> +# """ % (tbl_name, part_expr, run, 0)
> +# impalad_client.execute(OVERWRITE_SQL)
> +# for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> +# INSERT_SQL = """insert into table %s %s values (%i, %i)
> +# """ % (tbl_name, part_expr, run, i)
> +# impalad_client.execute(INSERT_SQL)
> +# finally:
> +# impalad_client.close()
> +#
> +# def _impala_role_read_inserts(self, tbl_name, needs_refresh,
> sleep_seconds):
> +# """SELECT from a table many times until the expected final values are
> found."""
> +# try:
> +# impalad_client = ImpalaTestSuite.create_impala_client()
> +# expected_result = {"run": -1, "i": 0}
> +# accept_empty_table = True
> +# while expected_result["run"] != NUM_OVERWRITES and \
> +# expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
> +# time.sleep(sleep_seconds)
> +# if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
> +# result = impalad_client.execute("select run, i from %s" % tbl_name)
> +# if len(result.data) == 0:
> +# assert accept_empty_table
> +# continue
> +# accept_empty_table = False
> +# self._verify_result(result, expected_result)
> +# finally:
> +# impalad_client.close()
> +#
> +# def _create_table(self, full_tbl_name, partitioned):
> +# """Creates test table with name 'full_tbl_name'. Table is partitioned if
> +# 'partitioned' is set to True."""
> +# part_expr = "partitioned by (p int)" if partitioned else ""
> +#
> +# CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
> +# 'transactional_properties' = 'insert_only', 'transactional' =
> 'true')
> +# """ % (full_tbl_name, part_expr)
> +# self.client.execute("drop table if exists %s" % full_tbl_name)
> +# self.client.execute(CREATE_SQL)
> +#
> +# def _run_test_read_hive_inserts(self, unique_database, partitioned):
> +# """Check that Impala can read a single insert only ACID table
> (over)written by Hive
> +# several times. Consistency can be checked by using incremental values
> for
> +# overwrites ('run') and inserts ('i').
> +# """
> +# tbl_name = "%s.test_read_hive_inserts" % unique_database
> +# self._create_table(tbl_name, partitioned)
> +#
> +# run_tasks([
> +# Task(self._hive_role_write_inserts, tbl_name, partitioned),
> +# Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
> +# sleep_seconds=3)])
> +#
> +# def _run_test_read_impala_inserts(self, unique_database, partitioned):
> +# """Check that Impala can read a single insert only ACID table
> (over)written by Hive
> +# several times. Consistency can be checked by using incremental values
> for
> +# overwrites ('run') and inserts ('i').
> +# """
> +# tbl_name = "%s.test_read_impala_inserts" % unique_database
> +# self._create_table(tbl_name, partitioned)
> +#
> +# run_tasks([
> +# Task(self._impala_role_write_inserts, tbl_name, partitioned),
> +# Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
> +# sleep_seconds=0.1)])
> +#
> +# @SkipIfHive2.acid
> +# @SkipIfS3.hive
> +# @SkipIfGCS.hive
> +# @SkipIfCOS.hive
> +# @pytest.mark.execute_serially
> +# @pytest.mark.stress
> +# def test_read_hive_inserts(self, unique_database):
> +# """Check that Impala can read partitioned and non-partitioned ACID
> tables
> +# written by Hive."""
> +# for is_partitioned in [False, True]:
> +# self._run_test_read_hive_inserts(unique_database, is_partitioned)
> +#
> +# @SkipIfHive2.acid
> +# @pytest.mark.execute_serially
> +# @pytest.mark.stress
> +# def test_read_impala_inserts(self, unique_database):
> +# """Check that Impala can read partitioned and non-partitioned ACID
> tables
> +# written by Hive."""
> +# for is_partitioned in [False, True]:
> +# self._run_test_read_impala_inserts(unique_database, is_partitioned)
> +#
> +# def _impala_role_partition_writer(self, tbl_name, partition,
> is_overwrite, sleep_sec):
> +# insert_op = "OVERWRITE" if is_overwrite else "INTO"
> +# try:
> +# impalad_client = ImpalaTestSuite.create_impala_client()
> +# impalad_client.execute(
> +# """insert {op} table {tbl_name} partition({partition})
> +# select sleep({sleep_ms})""".format(op=insert_op,
> tbl_name=tbl_name,
> +# partition=partition, sleep_ms=sleep_sec * 1000))
> +# finally:
> +# impalad_client.close()
> +#
> +# @pytest.mark.execute_serially
> +# @pytest.mark.stress
> +# @SkipIf.not_hdfs
> +# @UniqueDatabase.parametrize(sync_ddl=True)
> +# def test_partitioned_inserts(self, unique_database):
> +# """Check that the different ACID write operations take appropriate
> locks.
> +# INSERT INTO: should take a shared lock
> +# INSERT OVERWRITE: should take an exclusive lock
> +# Both should take PARTITION-level lock in case of static partition
> insert."""
> +# tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
> +# self.client.set_configuration_option("SYNC_DDL", "true")
> +# self.client.execute("""
> +# CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
> +# TBLPROPERTIES(
> +#
> 'transactional_properties'='insert_only','transactional'='true')""".format(
> +# tbl_name))
> +# # Warmup INSERT
> +# self.execute_query("alter table {0} add
> partition(p=0,q=0)".format(tbl_name))
> +# sleep_sec = 5
> +# task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
> +# "p=1,q=1", False, sleep_sec)
> +# # INSERT INTO the same partition can run in parallel.
> +# duration = run_tasks([task_insert_into, task_insert_into])
> +# assert duration < 3 * sleep_sec
> +# task_insert_overwrite = Task(self._impala_role_partition_writer,
> tbl_name,
> +# "p=1,q=1", True, sleep_sec)
> +# # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
> +# duration = run_tasks([task_insert_into, task_insert_overwrite])
> +# assert duration > 4 * sleep_sec
> +# # INSERT OVERWRITEs to the same partition should have mutual exclusion.
> +# duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
> +# assert duration > 4 * sleep_sec
> +# task_insert_overwrite_2 = Task(self._impala_role_partition_writer,
> tbl_name,
> +# "p=1,q=2", True, sleep_sec)
> +# # INSERT OVERWRITEs to different partitions can run in parallel.
> +# duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
> +# assert duration < 3 * sleep_sec
> +#
>
> class TestConcurrentAcidInserts(TestAcidStress):
> @classmethod
> (END)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]