This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 650758ffd93b326e3bede4822d4d716d7b2aa852
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Wed May 10 19:01:36 2023 +0200

    IMPALA-12130: Table creation time is not set properly in lineage log for 
Kudu and Iceberg tables
    
    For CTAS statements that create Kudu/Iceberg tables the lineage log
    was incomplete as it missed the table creation time of the newly
    created table. This information was missing because in CatalogOpExecutor
    createKuduTable() / createIcebergTable() did not set it in the
    TDdlExecResponse object. This patch adds the missing information.
    
    Testing
     * e2e test
    
    Change-Id: I6938938b1834809d5197a748c171e9a09e13906a
    Reviewed-on: http://gerrit.cloudera.org:8080/19868
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Gabor Kaszab <[email protected]>
---
 .../apache/impala/analysis/ColumnLineageGraph.java |  2 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 30 ++++++++++++--
 tests/custom_cluster/test_lineage.py               | 47 ++++++++++++++++------
 3 files changed, 62 insertions(+), 17 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java 
b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
index 9ba1f6371..75900cb18 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
@@ -497,7 +497,7 @@ public class ColumnLineageGraph {
               feTable.getMetaStoreTable().getCreateTime());
         } else {
           // -1 is just a placeholder that will be updated after the 
table/view has been
-          // created. See impala-server.cc (LogLineageRecord) for more 
information.
+          // created. See client-request-state.cc (LogLineageRecord) for more 
information.
           metadata = new Metadata(target.tableName_.toString(), -1);
         }
       }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 73bd44474..b7726c5e4 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -3495,6 +3495,10 @@ public class CatalogOpExecutor {
       Pair<Long, org.apache.hadoop.hive.metastore.api.Table> eventTblPair =
           getTableFromEvents(events, params.if_not_exists);
       createEventId = eventTblPair == null ? -1 : eventTblPair.first;
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          eventTblPair == null ? null : eventTblPair.second;
+      setTableNameAndCreateTimeInResponse(msTable,
+          newTable.getDbName(), newTable.getTableName(), response);
 
       // Add the table to the catalog cache
       Table newTbl = catalog_
@@ -3574,9 +3578,8 @@ public class CatalogOpExecutor {
               .getTable(newTable.getDbName(), newTable.getTableName()));
         }
         msTable = eventIdTblPair.second;
-        long tableCreateTime = msTable.getCreateTime();
-        response.setTable_name(newTable.getDbName() + "." + 
newTable.getTableName());
-        response.setTable_create_time(tableCreateTime);
+        setTableNameAndCreateTimeInResponse(msTable, newTable.getDbName(),
+            newTable.getTableName(), response);
         // For external tables set table location needed for lineage 
generation.
         if (newTable.getTableType() == TableType.EXTERNAL_TABLE.toString()) {
           String tableLocation = newTable.getSd().getLocation();
@@ -3634,6 +3637,23 @@ public class CatalogOpExecutor {
     return true;
   }
 
+  /**
+   * Sets table name and creation time in 'response' based on 'msTable'.
+   * If 'msTable' is null, then it loads the table from HMS.
+   * Throws exception if table is not found.
+   */
+  private void setTableNameAndCreateTimeInResponse(
+      org.apache.hadoop.hive.metastore.api.Table msTable, String dbName, 
String tblName,
+      TDdlExecResponse response) throws org.apache.thrift.TException {
+    if (msTable == null) {
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        msTable = msClient.getHiveClient().getTable(dbName, tblName);
+      }
+    }
+    response.setTable_name(dbName + "." + tblName);
+    response.setTable_create_time(msTable.getCreateTime());
+  }
+
   /**
    * Creates a new view in the metastore and adds an entry to the metadata 
cache to
    * lazily load the new metadata on the next access. Re-throws any Metastore
@@ -3762,6 +3782,10 @@ public class CatalogOpExecutor {
       Pair<Long, org.apache.hadoop.hive.metastore.api.Table> eventTblPair
           = getTableFromEvents(events, ifNotExists);
       long createEventId = eventTblPair == null ? -1 : eventTblPair.first;
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          eventTblPair == null ? null : eventTblPair.second;
+      setTableNameAndCreateTimeInResponse(msTable,
+          newTable.getDbName(), newTable.getTableName(), response);
       // Add the table to the catalog cache
       Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(),
           newTable.getTableName(), TImpalaTableType.TABLE, tblComment,
diff --git a/tests/custom_cluster/test_lineage.py 
b/tests/custom_cluster/test_lineage.py
index 6551d545e..a670b428f 100644
--- a/tests/custom_cluster/test_lineage.py
+++ b/tests/custom_cluster/test_lineage.py
@@ -82,11 +82,25 @@ class TestLineage(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"
                                     .format(CREATE_TABLE_TIME_LINEAGE_LOG_DIR))
-  def test_create_table_timestamp(self, vector, unique_database):
+  def test_create_table_timestamp(self, unique_database):
+    for table_format in ['textfile', 'kudu', 'iceberg']:
+      self.run_test_create_table_timestamp(unique_database, table_format)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      "--lineage_event_log_dir={0}"
+      .format(CREATE_TABLE_TIME_LINEAGE_LOG_DIR),
+      catalogd_args="--hms_event_polling_interval_s=0")
+  def test_create_table_timestamp_without_hms_events(self, unique_database):
+    for table_format in ['textfile', 'kudu', 'iceberg']:
+      self.run_test_create_table_timestamp(unique_database, table_format)
+
+  def run_test_create_table_timestamp(self, unique_database, table_format):
     """Test that 'createTableTime' in the lineage graph are populated with 
valid value
        from HMS."""
-    query = "create table {0}.lineage_test_tbl as select int_col, tinyint_col 
" \
-            "from functional.alltypes".format(unique_database)
+    query = "create table {0}.lineage_test_tbl_{1} primary key (int_col) 
stored as {1} " \
+            "as select int_col, bigint_col from functional.alltypes".format(
+                unique_database, table_format)
     result = self.execute_query_expect_success(self.client, query)
     profile_query_id = re.search("Query \(id=(.*)\):", 
result.runtime_profile).group(1)
 
@@ -98,16 +112,23 @@ class TestLineage(CustomClusterTestSuite):
       # Only the coordinator's log file will be populated.
       if os.path.getsize(log_path) > 0:
         with open(log_path) as log_file:
-          lineage_json = json.load(log_file)
-          assert lineage_json["queryId"] == profile_query_id
-          vertices = lineage_json["vertices"]
-          for vertex in vertices:
-            if vertex["vertexId"] == "int_col":
-              assert "metadata" in vertex
-              table_name = vertex["metadata"]["tableName"]
-              table_create_time = int(vertex["metadata"]["tableCreateTime"])
-              assert "{0}.lineage_test_tbl".format(unique_database) == 
table_name
-              assert table_create_time != -1
+          for line in log_file:
+            # Now that the test is executed multiple times we need to take a 
look at
+            # only the line that contains the expected table name.
+            expected_table_name =\
+                "{0}.lineage_test_tbl_{1}".format(unique_database, 
table_format)
+            if expected_table_name not in line: continue
+
+            lineage_json = json.loads(line)
+            assert lineage_json["queryId"] == profile_query_id
+            vertices = lineage_json["vertices"]
+            for vertex in vertices:
+              if vertex["vertexId"] == "int_col":
+                assert "metadata" in vertex
+                table_name = vertex["metadata"]["tableName"]
+                table_create_time = int(vertex["metadata"]["tableCreateTime"])
+                assert expected_table_name == table_name
+                assert table_create_time != -1
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"

Reply via email to