deniskuzZ commented on code in PR #6290:
URL: https://github.com/apache/hive/pull/6290#discussion_r2841488972
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -390,150 +508,102 @@ private long
updateTableProp(MultiDataSourceJdbcResource jdbcResource, String ca
//TODO: would be better to replace with MERGE or UPSERT
String command;
- if (dbEntityParam.key == null) {
+ if (dbEntityParam.key() == null) {
command = "INSERT INTO \"TABLE_PARAMS\" VALUES (:tblId, :key, :value)";
- } else if (!dbEntityParam.value.equals(propValue)) {
+ } else if (!dbEntityParam.value().equals(propValue)) {
command = "UPDATE \"TABLE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE
\"TBL_ID\" = :dbId AND \"PARAM_KEY\" = :key";
} else {
LOG.info("Database property: {} with value: {} already updated for db:
{}", prop, propValue, db);
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating {} for table: {} using command {}", prop, table,
command);
}
SqlParameterSource params = new MapSqlParameterSource()
- .addValue("tblId", dbEntityParam.id)
+ .addValue("tblId", dbEntityParam.id())
.addValue("key", prop)
.addValue("value", propValue);
if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
//only one row insert or update should happen
throw new RuntimeException("TABLE_PARAMS is corrupted for table: " +
table);
}
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
private void updatePartitionProp(MultiDataSourceJdbcResource jdbcResource,
long tableId,
- List<String> partList, String prop, String
propValue) {
+ List<String> partList, String prop, String propValue) {
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
//language=SQL
- prefix.append(
- "SELECT p.\"PART_ID\", pp.\"PARAM_KEY\", pp.\"PARAM_VALUE\" FROM
\"PARTITION_PARAMS\" pp\n" +
- "RIGHT JOIN \"PARTITIONS\" p ON pp.\"PART_ID\" = p.\"PART_ID\" WHERE
p.\"TBL_ID\" = :tblId AND pp.\"PARAM_KEY\" = :key");
+ prefix.append("""
+ SELECT p."PART_ID", pp."PARAM_KEY", pp."PARAM_VALUE" FROM
"PARTITION_PARAMS" pp
+ RIGHT JOIN "PARTITIONS" p ON pp."PART_ID" = p."PART_ID" WHERE
p."TBL_ID" = :tblId AND pp."PARAM_KEY" = :key""");
// Populate the complete query with provided prefix and suffix
TxnUtils.buildQueryWithINClauseStrings(jdbcResource.getConf(), queries,
prefix, suffix, partList,
"\"PART_NAME\"", true, false);
+
SqlParameterSource params = new MapSqlParameterSource()
.addValue("tblId", tableId)
.addValue("key", prop);
+
+ RowMapper<DbEntityParam> mapper = (rs, rowNum) ->
+ new DbEntityParam(
+ rs.getLong("PART_ID"), rs.getString("PARAM_KEY"),
rs.getString("PARAM_VALUE")
+ );
List<DbEntityParam> partitionParams = new ArrayList<>();
- for(String query : queries) {
+
+ for (String query : queries) {
if (LOG.isDebugEnabled()) {
LOG.debug("Going to execute query <" + query + ">");
}
- jdbcResource.getJdbcTemplate().query(query, params,
- (ResultSet rs) -> {
- while (rs.next()) {
- partitionParams.add(new DbEntityParam(rs.getLong("PART_ID"),
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE")));
- }
- });
+ partitionParams.addAll(
+ jdbcResource.getJdbcTemplate().query(query, params, mapper)
+ );
}
//TODO: would be better to replace with MERGE or UPSERT
int maxBatchSize = MetastoreConf.getIntVar(jdbcResource.getConf(),
MetastoreConf.ConfVars.JDBC_MAX_BATCH_SIZE);
//all insert in one batch
int[][] inserts =
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"INSERT INTO \"PARTITION_PARAMS\" VALUES (?, ?, ?)",
- partitionParams.stream().filter(p -> p.key ==
null).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() == null)
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
- ps.setLong(1, argument.id);
- ps.setString(2, argument.key);
+ ps.setLong(1, argument.id());
+ ps.setString(2, argument.key());
ps.setString(3, propValue);
});
//all update in one batch
int[][] updates
=jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"UPDATE \"PARTITION_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"PART_ID\"
= ? AND \"PARAM_KEY\" = ?",
- partitionParams.stream().filter(p -> p.key != null &&
!propValue.equals(p.value)).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() != null && !propValue.equals(p.value()))
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
ps.setString(1, propValue);
- ps.setLong(2, argument.id);
- ps.setString(3, argument.key);
+ ps.setLong(2, argument.id());
+ ps.setString(3, argument.key());
});
- if (Arrays.stream(inserts).flatMapToInt(IntStream::of).sum() +
Arrays.stream(updates).flatMapToInt(IntStream::of).sum() != partList.size()) {
+ int totalChanges =
+ Arrays.stream(inserts)
+ .flatMapToInt(IntStream::of)
+ .sum()
+ + Arrays.stream(updates)
+ .flatMapToInt(IntStream::of)
+ .sum();
+
+ if (totalChanges != partList.size()) {
throw new RuntimeException("PARTITION_PARAMS is corrupted, update
failed");
}
}
- private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource
jdbcResource, long txnid) throws MetaException {
- String writeConflictQuery =
jdbcResource.getSqlGenerator().addLimitClause(1,
- "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
- "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\",
\"COMMITTED\".\"WS_PARTITION\", " +
- "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\",
\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " +
- "\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" FROM
\"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " +
- "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND
\"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " +
- //For partitioned table we always track writes at partition level
(never at table)
- //and for non partitioned - always at table level, thus the same table
should never
- //have entries with partition key and w/o
- "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR
(\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL))
" +
- "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\" " +
//txns overlap; could replace ws_txnid
- // with txnid, though any decent DB should infer this
- "AND \"CUR\".\"WS_TXNID\"= :txnId " + //make sure RHS of join only has
rows we just inserted as
- // part of this commitTxn() op
- "AND \"COMMITTED\".\"WS_TXNID\" <> :txnId " + //and LHS only has
committed txns
- //U+U and U+D and D+D is a conflict and we don't currently track I in
WRITE_SET at all
- //it may seem like D+D should not be in conflict but consider 2
multi-stmt txns
- //where each does "delete X + insert X, where X is a row with the same
PK. This is
- //equivalent to an update of X but won't be in conflict unless D+D is
in conflict.
- //The same happens when Hive splits U=I+D early so it looks like 2
branches of a
- //multi-insert stmt (an Insert and a Delete branch). It also 'feels'
- // un-serializable to allow concurrent deletes
- "AND (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete) " +
- "AND \"CUR\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete))");
- LOG.debug("Going to execute query: <{}>", writeConflictQuery);
- return jdbcResource.getJdbcTemplate().query(writeConflictQuery,
- new MapSqlParameterSource()
- .addValue("txnId", txnid)
- .addValue("opUpdate", OperationType.UPDATE.getSqlConst())
- .addValue("opDelete", OperationType.DELETE.getSqlConst()),
- (ResultSet rs) -> {
- if(rs.next()) {
- return new WriteSetInfo(rs.getLong("WS_TXNID"),
rs.getLong("CUR_WS_COMMIT_ID"),
- rs.getLong("WS_COMMIT_ID"), rs.getString("CUR_OP"),
rs.getString("COMMITTED_OP"),
- rs.getString("WS_DATABASE"), rs.getString("WS_TABLE"),
rs.getString("WS_PARTITION"));
- } else {
- return null;
- }
- });
- }
-
- private void moveTxnComponentsToCompleted(MultiDataSourceJdbcResource
jdbcResource, long txnid, char isUpdateDelete) {
Review Comment:
same logical grouping idea here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]