[ 
https://issues.apache.org/jira/browse/IMPALA-11189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17509140#comment-17509140
 ] 

Quanlong Huang commented on IMPALA-11189:
-----------------------------------------

Maybe I'm missing something, but I can't reproduce the issue.

Could you upload the log files of catalogd and impalads? [~sql_forever]

> Concurrent insert ACID tests are broken in local catalog mode
> -------------------------------------------------------------
>
>                 Key: IMPALA-11189
>                 URL: https://issues.apache.org/jira/browse/IMPALA-11189
>             Project: IMPALA
>          Issue Type: Bug
>          Components: Catalog
>            Reporter: Qifan Chen
>            Priority: Major
>
> Stress test test_concurrent_inserts (in tests/stress/test_acid_stress.py) can 
> fail repeatedly in local catalog mode. In this case, the concurrent checker 
> query (select * from <table>) returns duplicated rows such as reported below, 
> where row [0,2] is duplicated. 
> The failure can be reproduced quite easily by running the test (i.e., 
> TestConcurrentAcidInserts) first, via commenting out all the tests prior to 
> it in the test file tests/stress/test_acid_stress.py. 
> Setup:
> 1. Build the impala and clear HMS in case in a bad state: 
> $IMPALA_HOME/buildall.sh -format_metastore -notests
> 2. Start the cluster in local catalog mode: 
> $IMPALA_HOME/bin/start-impala-cluster.py --impalad_args 
> --use_local_catalog=true --catalogd_args  --catalog_topic_mode=minimal 
> --catalogd_args --hms_event_polling_interval_s=1
> 3. Run the modified stress test: $IMPALA_HOME/bin/impala-py.test 
> $IMPALA_TESTS/stress/test_acid_stress.py
> Error reported:
> {code:java}
> 09:11:00 qchen@qifan-10229: Impala.03112022] test_acid_stress
> rootLoggerLevel = INFO
> ================================================== test session starts 
> ===================================================
> platform linux2 -- Python 2.7.16, pytest-2.9.2, py-1.4.32, pluggy-0.3.1 -- 
> /home/qchen/Impala.03112022/infra/python/env-gcc7.5.0/bin/python
> cachedir: tests/.cache
> rootdir: /home/qchen/Impala.03112022/tests, inifile: pytest.ini
> plugins: xdist-1.17.1, timeout-1.2.1, random-0.2, forked-0.2
> timeout: 7200s method: signal
> collected 2 items 
> tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::test_concurrent_inserts[unique_database0]
>  FAILED
> tests/stress/test_acid_stress.py::TestFailingAcidInserts::test_failing_inserts[unique_database0]
>  PASSED
> ================================================ short test summary info 
> =================================================
> FAIL 
> tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]
> ======================================================== FAILURES 
> ========================================================
> __________________________ 
> TestConcurrentAcidInserts.test_concurrent_inserts[unique_database0] 
> ___________________________
> tests/stress/test_acid_stress.py:307: in test_concurrent_inserts
>     run_tasks(writers + checkers)
> tests/stress/stress_util.py:45: in run_tasks
>     pool.map_async(Task.run, tasks).get(timeout_seconds)
> ../Impala.03082022/toolchain/toolchain-packages-gcc7.5.0/python-2.7.16/lib/python2.7/multiprocessing/pool.py:572:
>  in get
>     raise self._value
> E   AssertionError: wid: 2
> E   assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
> E     At index 3 diff: 2 != 3
> E     Left contains more items, first extra item: 4
> E     Full diff:
> E     - [0, 1, 2, 2, 3, 4]
> E     ?           ---
> E     + [0, 1, 2, 3, 4]
> ------------------------------------------------- Captured stderr setup 
> --------------------------------------------------
> SET 
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21000
> -- connecting to localhost:21050 with impyla
> -- 2022-03-16 09:20:54,762 INFO     MainThread: Closing active operation
> -- connecting to localhost:28000 with impyla
> -- 2022-03-16 09:20:54,774 INFO     MainThread: Closing active operation
> SET 
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET sync_ddl=True;
> -- executing against localhost:21000
> DROP DATABASE IF EXISTS `test_concurrent_inserts_8933345c` CASCADE;
> -- 2022-03-16 09:20:54,808 INFO     MainThread: Started query 
> 28457f4c7e77cdec:c6d3731900000000
> SET 
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET sync_ddl=True;
> -- executing against localhost:21000
> CREATE DATABASE `test_concurrent_inserts_8933345c`;
> -- 2022-03-16 09:20:54,877 INFO     MainThread: Started query 
> 374bf99aea680523:48d2405400000000
> -- 2022-03-16 09:21:01,164 INFO     MainThread: Created database 
> "test_concurrent_inserts_8933345c" for test ID 
> "stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]"
> -------------------------------------------------- Captured stderr call 
> --------------------------------------------------
> SET SYNC_DDL=true;
> -- executing against localhost:21000
> drop table if exists test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:01,173 INFO     MainThread: Started query 
> 20480c2a1d336d35:c2d84edd00000000
> -- executing against localhost:21000
> create table test_concurrent_inserts_8933345c.test_concurrent_inserts (wid 
> int, i int) TBLPROPERTIES (
>         'transactional_properties' = 'insert_only', 'transactional' = 'true')
>         ;
> -- 2022-03-16 09:21:01,294 INFO     MainThread: Started query 
> 754969473483b4e9:acfc852300000000
> SET 
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21000
> SET 
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET 
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET 
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET 
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> SET 
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21001
> SET 
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- executing against localhost:21000
> -- connecting to: localhost:21000
> -- connecting to: localhost:21002
> SET 
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21001
> SET 
> client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
> -- connecting to: localhost:21002
> -- connecting to: localhost:21000
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
> values (0, 0);
> -- executing against localhost:21001
> -- connecting to: localhost:21002
> -- executing against localhost:21002
> -- executing against localhost:21000
> -- connecting to: localhost:21001
> -- executing against localhost:21001
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
> values (1, 0);
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
> values (2, 0);
> -- executing against localhost:21000
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
> values (3, 0);
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
> values (4, 0);
> -- executing against localhost:21001
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
> values (5, 0);
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:07,824 INFO     Thread-3: Started query 
> 6f4e9d38d1723252:a6a7368f00000000
> -- 2022-03-16 09:21:07,859 INFO     Thread-4: Started query 
> 33423e0145b7d3e0:ea8d626200000000
> -- 2022-03-16 09:21:07,861 INFO     Thread-8: Started query 
> 4046d8edde82931b:aa11c84800000000
> -- 2022-03-16 09:21:07,875 INFO     Thread-9: Started query 
> 594d63b7814c31ab:8f2b92ca00000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
> values (2, 1);
> -- 2022-03-16 09:21:08,229 INFO     Thread-4: Started query 
> a94dab601c125bb4:8988934300000000
> -- executing against localhost:21000
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
> values (2, 2);
> -- 2022-03-16 09:21:08,384 INFO     Thread-3: Started query 
> 4040597eba7beb36:4cfa08c800000000
> -- 2022-03-16 09:21:08,409 INFO     Thread-4: Started query 
> 854e1aee63575861:dbf8bafb00000000
> -- executing against localhost:21002
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- executing against localhost:21001
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:08,435 INFO     Thread-9: Started query 
> 7e4d4cf54a44cf03:868efcb100000000
> -- 2022-03-16 09:21:08,456 INFO     Thread-8: Started query 
> 4645788cf2a4d401:0aea969e00000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
> values (2, 3);
> -- 2022-03-16 09:21:08,586 INFO     Thread-4: Started query 
> 4e4cdd976dfa3358:6c455b7300000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
> values (2, 4);
> -- 2022-03-16 09:21:08,711 INFO     Thread-4: Started query 
> 6f46942eb8807e5f:ffbb146000000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
> values (2, 5);
> -- executing against localhost:21000
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:08,959 INFO     Thread-3: Started query 
> d940fa405a776bef:3705218900000000
> -- 2022-03-16 09:21:08,977 INFO     Thread-4: Started query 
> f947061a7c45901c:12b2ba9d00000000
> -- executing against localhost:21001
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:08,997 INFO     Thread-9: Started query 
> f54a0aa01bbb2f86:3b89ae9600000000
> -- executing against localhost:21002
> select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
> -- 2022-03-16 09:21:09,147 INFO     Thread-8: Started query 
> fa418523bc0552ce:f0e49b1a00000000
> -- executing against localhost:21002
> insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
> values (2, 6);
> -- 2022-03-16 09:21:09,250 INFO     Thread-4: Started query 
> fc4751d49d1a1aa2:4bcb48d600000000
> -- closing connection to: localhost:21002
> Traceback (most recent call last):
>   File "/home/qchen/Impala.03112022/tests/stress/stress_util.py", line 35, in 
> run
>     return self.func(*self.args, **self.kwargs)
>   File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line 
> 276, in _impala_role_concurrent_checker
>     verify_result_set(result)
>   File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line 
> 269, in verify_result_set
>     assert sorted_run == range(sorted_run[0], sorted_run[-1] + 1), "wid: %d" 
> % wid
> AssertionError: wid: 2
> assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
>   At index 3 diff: 2 != 3
>   Left contains more items, first extra item: 4
>   Full diff:
>   - [0, 1, 2, 2, 3, 4]
>   ?           ---
>   + [0, 1, 2, 3, 4]
> ========================================== 1 failed, 1 passed in 158.88 
> seconds ==========================================
> [09:23:33 qchen@qifan-10229: Impala.03112022] git branch
>   IMPALA-10992-auto-scaling-planner-support
> * master
> [09:23:41 qchen@qifan-10229: Impala.03112022] git diff
> diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
> b/fe/src/main/java/org/apache/impala/service/Frontend.java
> index a921bb961..6df592f3f 100644
> --- a/fe/src/main/java/org/apache/impala/service/Frontend.java
> +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
> @@ -1644,6 +1644,7 @@ public class Frontend {
>          markTimelineRetries(attempt, retryMsg, timeline);
>          return req;
>        } catch (InconsistentMetadataFetchException e) {
> +        LOG.error("BAR: catch an InconsistentMetadataFetchException");
>          if (attempt++ == INCONSISTENT_METADATA_NUM_RETRIES) {
>            markTimelineRetries(attempt, e.getMessage(), timeline);
>            throw e;
> @@ -1819,14 +1820,18 @@ public class Frontend {
>      } catch (Exception e) {
>        if (queryCtx.isSetTransaction_id()) {
>          try {
> +          LOG.error("BAR: to abort Transaction");
>            abortTransaction(queryCtx.getTransaction_id());
> +          LOG.error("BAR: Transaction aborted");
>            timeline.markEvent("Transaction aborted");
>          } catch (TransactionException te) {
>            LOG.error("Could not abort transaction because: " + 
> te.getMessage());
>          }
>        } else if (queryCtx.isIs_kudu_transactional()) {
>          try {
> +          LOG.error("BAR: to abort kudu Transaction");
>            abortKuduTransaction(queryCtx.getQuery_id());
> +          LOG.error("BAR: kudu Transaction aborted");
>            timeline.markEvent(
>                "Kudu transaction aborted: " + 
> queryCtx.getQuery_id().toString());
>          } catch (TransactionException te) {
> diff --git a/tests/stress/test_acid_stress.py 
> b/tests/stress/test_acid_stress.py
> index f6439ff3c..09ec874e7 100644
> --- a/tests/stress/test_acid_stress.py
> +++ b/tests/stress/test_acid_stress.py
> @@ -46,188 +46,188 @@ class TestAcidStress(ImpalaTestSuite):
>                     v.get_value('table_format').compression_codec == 'none'))
>  
>  
> -class TestAcidInsertsBasic(TestAcidStress):
> -  @classmethod
> -  def get_workload(self):
> -    return super(TestAcidInsertsBasic, self).get_workload()
> -
> -  @classmethod
> -  def add_test_dimensions(cls):
> -    super(TestAcidInsertsBasic, cls).add_test_dimensions()
> -
> -  def _verify_result(self, result, expected_result):
> -    """Verify invariants for 'run' and 'i'."""
> -    assert len(result.data) > 0
> -    run_max = -1
> -    i_list = []
> -    for line in result.data:
> -      [run, i] = map(int, (line.split('\t')))
> -      run_max = max(run_max, run)
> -      i_list.append(i)
> -    assert expected_result["run"] <= run_max  # shouldn't see data 
> overwritten in the past
> -    i_list.sort()
> -    if expected_result["run"] < run_max:
> -      expected_result["run"] = run_max
> -      expected_result["i"] = 0
> -      return
> -    assert i_list[-1] >= expected_result["i"]
> -    assert i_list == range(i_list[-1] + 1)  # 'i' should have all values 
> from 0 to max_i
> -    expected_result["i"] = i_list[-1]
> -
> -  def _hive_role_write_inserts(self, tbl_name, partitioned):
> -    """INSERT INTO/OVERWRITE a table several times from Hive."""
> -    part_expr = "partition (p=1)" if partitioned else ""
> -    for run in xrange(0, NUM_OVERWRITES):
> -      OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> -          """ % (tbl_name, part_expr, run, 0)
> -      self.run_stmt_in_hive(OVERWRITE_SQL)
> -      for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> -        INSERT_SQL = """insert into table %s %s values (%i, %i)
> -            """ % (tbl_name, part_expr, run, i)
> -        self.run_stmt_in_hive(INSERT_SQL)
> -
> -  def _impala_role_write_inserts(self, tbl_name, partitioned):
> -    """INSERT INTO/OVERWRITE a table several times from Impala."""
> -    try:
> -      impalad_client = ImpalaTestSuite.create_impala_client()
> -      part_expr = "partition (p=1)" if partitioned else ""
> -      for run in xrange(0, NUM_OVERWRITES + 1):
> -        OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> -            """ % (tbl_name, part_expr, run, 0)
> -        impalad_client.execute(OVERWRITE_SQL)
> -        for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> -          INSERT_SQL = """insert into table %s %s values (%i, %i)
> -              """ % (tbl_name, part_expr, run, i)
> -          impalad_client.execute(INSERT_SQL)
> -    finally:
> -      impalad_client.close()
> -
> -  def _impala_role_read_inserts(self, tbl_name, needs_refresh, 
> sleep_seconds):
> -    """SELECT from a table many times until the expected final values are 
> found."""
> -    try:
> -      impalad_client = ImpalaTestSuite.create_impala_client()
> -      expected_result = {"run": -1, "i": 0}
> -      accept_empty_table = True
> -      while expected_result["run"] != NUM_OVERWRITES and \
> -          expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
> -        time.sleep(sleep_seconds)
> -        if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
> -        result = impalad_client.execute("select run, i from %s" % tbl_name)
> -        if len(result.data) == 0:
> -          assert accept_empty_table
> -          continue
> -        accept_empty_table = False
> -        self._verify_result(result, expected_result)
> -    finally:
> -      impalad_client.close()
> -
> -  def _create_table(self, full_tbl_name, partitioned):
> -    """Creates test table with name 'full_tbl_name'. Table is partitioned if
> -    'partitioned' is set to True."""
> -    part_expr = "partitioned by (p int)" if partitioned else ""
> -
> -    CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
> -         'transactional_properties' = 'insert_only', 'transactional' = 
> 'true')
> -         """ % (full_tbl_name, part_expr)
> -    self.client.execute("drop table if exists %s" % full_tbl_name)
> -    self.client.execute(CREATE_SQL)
> -
> -  def _run_test_read_hive_inserts(self, unique_database, partitioned):
> -    """Check that Impala can read a single insert only ACID table 
> (over)written by Hive
> -    several times. Consistency can be checked by using incremental values for
> -    overwrites ('run') and inserts ('i').
> -    """
> -    tbl_name = "%s.test_read_hive_inserts" % unique_database
> -    self._create_table(tbl_name, partitioned)
> -
> -    run_tasks([
> -        Task(self._hive_role_write_inserts, tbl_name, partitioned),
> -        Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
> -           sleep_seconds=3)])
> -
> -  def _run_test_read_impala_inserts(self, unique_database, partitioned):
> -    """Check that Impala can read a single insert only ACID table 
> (over)written by Hive
> -    several times. Consistency can be checked by using incremental values for
> -    overwrites ('run') and inserts ('i').
> -    """
> -    tbl_name = "%s.test_read_impala_inserts" % unique_database
> -    self._create_table(tbl_name, partitioned)
> -
> -    run_tasks([
> -        Task(self._impala_role_write_inserts, tbl_name, partitioned),
> -        Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
> -           sleep_seconds=0.1)])
> -
> -  @SkipIfHive2.acid
> -  @SkipIfS3.hive
> -  @SkipIfGCS.hive
> -  @SkipIfCOS.hive
> -  @pytest.mark.execute_serially
> -  @pytest.mark.stress
> -  def test_read_hive_inserts(self, unique_database):
> -    """Check that Impala can read partitioned and non-partitioned ACID tables
> -    written by Hive."""
> -    for is_partitioned in [False, True]:
> -      self._run_test_read_hive_inserts(unique_database, is_partitioned)
> -
> -  @SkipIfHive2.acid
> -  @pytest.mark.execute_serially
> -  @pytest.mark.stress
> -  def test_read_impala_inserts(self, unique_database):
> -    """Check that Impala can read partitioned and non-partitioned ACID tables
> -    written by Hive."""
> -    for is_partitioned in [False, True]:
> -      self._run_test_read_impala_inserts(unique_database, is_partitioned)
> -
> -  def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite, 
> sleep_sec):
> -    insert_op = "OVERWRITE" if is_overwrite else "INTO"
> -    try:
> -      impalad_client = ImpalaTestSuite.create_impala_client()
> -      impalad_client.execute(
> -          """insert {op} table {tbl_name} partition({partition})
> -             select sleep({sleep_ms})""".format(op=insert_op, 
> tbl_name=tbl_name,
> -          partition=partition, sleep_ms=sleep_sec * 1000))
> -    finally:
> -      impalad_client.close()
> -
> -  @pytest.mark.execute_serially
> -  @pytest.mark.stress
> -  @SkipIf.not_hdfs
> -  @UniqueDatabase.parametrize(sync_ddl=True)
> -  def test_partitioned_inserts(self, unique_database):
> -    """Check that the different ACID write operations take appropriate locks.
> -       INSERT INTO: should take a shared lock
> -       INSERT OVERWRITE: should take an exclusive lock
> -       Both should take PARTITION-level lock in case of static partition 
> insert."""
> -    tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
> -    self.client.set_configuration_option("SYNC_DDL", "true")
> -    self.client.execute("""
> -        CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
> -        TBLPROPERTIES(
> -        
> 'transactional_properties'='insert_only','transactional'='true')""".format(
> -        tbl_name))
> -    # Warmup INSERT
> -    self.execute_query("alter table {0} add 
> partition(p=0,q=0)".format(tbl_name))
> -    sleep_sec = 5
> -    task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
> -        "p=1,q=1", False, sleep_sec)
> -    # INSERT INTO the same partition can run in parallel.
> -    duration = run_tasks([task_insert_into, task_insert_into])
> -    assert duration < 3 * sleep_sec
> -    task_insert_overwrite = Task(self._impala_role_partition_writer, 
> tbl_name,
> -      "p=1,q=1", True, sleep_sec)
> -    # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
> -    duration = run_tasks([task_insert_into, task_insert_overwrite])
> -    assert duration > 4 * sleep_sec
> -    # INSERT OVERWRITEs to the same partition should have mutual exclusion.
> -    duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
> -    assert duration > 4 * sleep_sec
> -    task_insert_overwrite_2 = Task(self._impala_role_partition_writer, 
> tbl_name,
> -      "p=1,q=2", True, sleep_sec)
> -    # INSERT OVERWRITEs to different partitions can run in parallel.
> -    duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
> -    assert duration < 3 * sleep_sec
> -
> +#class TestAcidInsertsBasic(TestAcidStress):
> +#  @classmethod
> +#  def get_workload(self):
> +#    return super(TestAcidInsertsBasic, self).get_workload()
> +#
> +#  @classmethod
> +#  def add_test_dimensions(cls):
> +#    super(TestAcidInsertsBasic, cls).add_test_dimensions()
> +#
> +#  def _verify_result(self, result, expected_result):
> +#    """Verify invariants for 'run' and 'i'."""
> +#    assert len(result.data) > 0
> +#    run_max = -1
> +#    i_list = []
> +#    for line in result.data:
> +#      [run, i] = map(int, (line.split('\t')))
> +#      run_max = max(run_max, run)
> +#      i_list.append(i)
> +#    assert expected_result["run"] <= run_max  # shouldn't see data 
> overwritten in the past
> +#    i_list.sort()
> +#    if expected_result["run"] < run_max:
> +#      expected_result["run"] = run_max
> +#      expected_result["i"] = 0
> +#      return
> +#    assert i_list[-1] >= expected_result["i"]
> +#    assert i_list == range(i_list[-1] + 1)  # 'i' should have all values 
> from 0 to max_i
> +#    expected_result["i"] = i_list[-1]
> +#
> +#  def _hive_role_write_inserts(self, tbl_name, partitioned):
> +#    """INSERT INTO/OVERWRITE a table several times from Hive."""
> +#    part_expr = "partition (p=1)" if partitioned else ""
> +#    for run in xrange(0, NUM_OVERWRITES):
> +#      OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> +#          """ % (tbl_name, part_expr, run, 0)
> +#      self.run_stmt_in_hive(OVERWRITE_SQL)
> +#      for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> +#        INSERT_SQL = """insert into table %s %s values (%i, %i)
> +#            """ % (tbl_name, part_expr, run, i)
> +#        self.run_stmt_in_hive(INSERT_SQL)
> +#
> +#  def _impala_role_write_inserts(self, tbl_name, partitioned):
> +#    """INSERT INTO/OVERWRITE a table several times from Impala."""
> +#    try:
> +#      impalad_client = ImpalaTestSuite.create_impala_client()
> +#      part_expr = "partition (p=1)" if partitioned else ""
> +#      for run in xrange(0, NUM_OVERWRITES + 1):
> +#        OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
> +#            """ % (tbl_name, part_expr, run, 0)
> +#        impalad_client.execute(OVERWRITE_SQL)
> +#        for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
> +#          INSERT_SQL = """insert into table %s %s values (%i, %i)
> +#              """ % (tbl_name, part_expr, run, i)
> +#          impalad_client.execute(INSERT_SQL)
> +#    finally:
> +#      impalad_client.close()
> +#
> +#  def _impala_role_read_inserts(self, tbl_name, needs_refresh, 
> sleep_seconds):
> +#    """SELECT from a table many times until the expected final values are 
> found."""
> +#    try:
> +#      impalad_client = ImpalaTestSuite.create_impala_client()
> +#      expected_result = {"run": -1, "i": 0}
> +#      accept_empty_table = True
> +#      while expected_result["run"] != NUM_OVERWRITES and \
> +#          expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
> +#        time.sleep(sleep_seconds)
> +#        if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
> +#        result = impalad_client.execute("select run, i from %s" % tbl_name)
> +#        if len(result.data) == 0:
> +#          assert accept_empty_table
> +#          continue
> +#        accept_empty_table = False
> +#        self._verify_result(result, expected_result)
> +#    finally:
> +#      impalad_client.close()
> +#
> +#  def _create_table(self, full_tbl_name, partitioned):
> +#    """Creates test table with name 'full_tbl_name'. Table is partitioned if
> +#    'partitioned' is set to True."""
> +#    part_expr = "partitioned by (p int)" if partitioned else ""
> +#
> +#    CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
> +#         'transactional_properties' = 'insert_only', 'transactional' = 
> 'true')
> +#         """ % (full_tbl_name, part_expr)
> +#    self.client.execute("drop table if exists %s" % full_tbl_name)
> +#    self.client.execute(CREATE_SQL)
> +#
> +#  def _run_test_read_hive_inserts(self, unique_database, partitioned):
> +#    """Check that Impala can read a single insert only ACID table 
> (over)written by Hive
> +#    several times. Consistency can be checked by using incremental values 
> for
> +#    overwrites ('run') and inserts ('i').
> +#    """
> +#    tbl_name = "%s.test_read_hive_inserts" % unique_database
> +#    self._create_table(tbl_name, partitioned)
> +#
> +#    run_tasks([
> +#        Task(self._hive_role_write_inserts, tbl_name, partitioned),
> +#        Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
> +#           sleep_seconds=3)])
> +#
> +#  def _run_test_read_impala_inserts(self, unique_database, partitioned):
> +#    """Check that Impala can read a single insert only ACID table 
> (over)written by Hive
> +#    several times. Consistency can be checked by using incremental values 
> for
> +#    overwrites ('run') and inserts ('i').
> +#    """
> +#    tbl_name = "%s.test_read_impala_inserts" % unique_database
> +#    self._create_table(tbl_name, partitioned)
> +#
> +#    run_tasks([
> +#        Task(self._impala_role_write_inserts, tbl_name, partitioned),
> +#        Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
> +#           sleep_seconds=0.1)])
> +#
> +#  @SkipIfHive2.acid
> +#  @SkipIfS3.hive
> +#  @SkipIfGCS.hive
> +#  @SkipIfCOS.hive
> +#  @pytest.mark.execute_serially
> +#  @pytest.mark.stress
> +#  def test_read_hive_inserts(self, unique_database):
> +#    """Check that Impala can read partitioned and non-partitioned ACID 
> tables
> +#    written by Hive."""
> +#    for is_partitioned in [False, True]:
> +#      self._run_test_read_hive_inserts(unique_database, is_partitioned)
> +#
> +#  @SkipIfHive2.acid
> +#  @pytest.mark.execute_serially
> +#  @pytest.mark.stress
> +#  def test_read_impala_inserts(self, unique_database):
> +#    """Check that Impala can read partitioned and non-partitioned ACID 
> tables
> +#    written by Hive."""
> +#    for is_partitioned in [False, True]:
> +#      self._run_test_read_impala_inserts(unique_database, is_partitioned)
> +#
> +#  def _impala_role_partition_writer(self, tbl_name, partition, 
> is_overwrite, sleep_sec):
> +#    insert_op = "OVERWRITE" if is_overwrite else "INTO"
> +#    try:
> +#      impalad_client = ImpalaTestSuite.create_impala_client()
> +#      impalad_client.execute(
> +#          """insert {op} table {tbl_name} partition({partition})
> +#             select sleep({sleep_ms})""".format(op=insert_op, 
> tbl_name=tbl_name,
> +#          partition=partition, sleep_ms=sleep_sec * 1000))
> +#    finally:
> +#      impalad_client.close()
> +#
> +#  @pytest.mark.execute_serially
> +#  @pytest.mark.stress
> +#  @SkipIf.not_hdfs
> +#  @UniqueDatabase.parametrize(sync_ddl=True)
> +#  def test_partitioned_inserts(self, unique_database):
> +#    """Check that the different ACID write operations take appropriate 
> locks.
> +#       INSERT INTO: should take a shared lock
> +#       INSERT OVERWRITE: should take an exclusive lock
> +#       Both should take PARTITION-level lock in case of static partition 
> insert."""
> +#    tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
> +#    self.client.set_configuration_option("SYNC_DDL", "true")
> +#    self.client.execute("""
> +#        CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
> +#        TBLPROPERTIES(
> +#        
> 'transactional_properties'='insert_only','transactional'='true')""".format(
> +#        tbl_name))
> +#    # Warmup INSERT
> +#    self.execute_query("alter table {0} add 
> partition(p=0,q=0)".format(tbl_name))
> +#    sleep_sec = 5
> +#    task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
> +#        "p=1,q=1", False, sleep_sec)
> +#    # INSERT INTO the same partition can run in parallel.
> +#    duration = run_tasks([task_insert_into, task_insert_into])
> +#    assert duration < 3 * sleep_sec
> +#    task_insert_overwrite = Task(self._impala_role_partition_writer, 
> tbl_name,
> +#      "p=1,q=1", True, sleep_sec)
> +#    # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
> +#    duration = run_tasks([task_insert_into, task_insert_overwrite])
> +#    assert duration > 4 * sleep_sec
> +#    # INSERT OVERWRITEs to the same partition should have mutual exclusion.
> +#    duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
> +#    assert duration > 4 * sleep_sec
> +#    task_insert_overwrite_2 = Task(self._impala_role_partition_writer, 
> tbl_name,
> +#      "p=1,q=2", True, sleep_sec)
> +#    # INSERT OVERWRITEs to different partitions can run in parallel.
> +#    duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
> +#    assert duration < 3 * sleep_sec
> +#
>  
>  class TestConcurrentAcidInserts(TestAcidStress):
>    @classmethod
> (END)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to